refactor_: do not use identify protocol, and actually use the nodes from the config for peer exchange (#5212)

This commit is contained in:
richΛrd 2024-05-27 09:43:47 -04:00 committed by GitHub
parent 19875ed9b5
commit 0a9cff25f4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -35,7 +35,6 @@ import (
"github.com/jellydator/ttlcache/v3"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
"github.com/multiformats/go-multiaddr"
"google.golang.org/protobuf/proto"
@ -92,7 +91,6 @@ type ITelemetryClient interface {
// network, using its very own P2P communication layer.
type Waku struct {
node *node.WakuNode // reference to a libp2p waku node
identifyService identify.IDService
appDB *sql.DB
dnsAddressCache map[string][]dnsdisc.DiscoveredNode // Map to store the multiaddresses returned by dns discovery
@ -414,7 +412,7 @@ func (w *Waku) discoverAndConnectPeers() error {
fnApply := func(d dnsdisc.DiscoveredNode, wg *sync.WaitGroup) {
defer wg.Done()
if len(d.PeerInfo.Addrs) != 0 {
go w.identifyAndConnect(w.ctx, w.cfg.LightClient, d.PeerInfo)
go w.connect(d.PeerInfo, wps.DNSDiscovery)
}
}
@ -441,53 +439,17 @@ func (w *Waku) discoverAndConnectPeers() error {
continue
}
go w.identifyAndConnect(w.ctx, w.cfg.LightClient, *peerInfo)
go w.connect(*peerInfo, wps.Static)
}
}
return nil
}
func (w *Waku) identifyAndConnect(ctx context.Context, isLightClient bool, peerInfo peer.AddrInfo) {
ctx, cancel := context.WithTimeout(ctx, 7*time.Second)
defer cancel()
err := w.node.Host().Connect(ctx, peerInfo)
if err != nil {
w.logger.Error("could not connect to peer", zap.Any("peer", peerInfo), zap.Error(err))
return
}
conns := w.node.Host().Network().ConnsToPeer(peerInfo.ID)
if len(conns) == 0 {
return // No connection
}
select {
case <-w.ctx.Done():
return
case <-w.identifyService.IdentifyWait(conns[0]):
if isLightClient {
err = w.node.Host().Network().ClosePeer(peerInfo.ID)
if err != nil {
w.logger.Error("could not close connections to peer", zap.Stringer("peer", peerInfo.ID), zap.Error(err))
}
return
}
supportedProtocols, err := w.node.Host().Peerstore().SupportsProtocols(peerInfo.ID, relay.WakuRelayID_v200)
if err != nil {
w.logger.Error("could not obtain protocols", zap.Stringer("peer", peerInfo.ID), zap.Error(err))
return
}
if len(supportedProtocols) == 0 {
err = w.node.Host().Network().ClosePeer(peerInfo.ID)
if err != nil {
w.logger.Error("could not close connections to peer", zap.Stringer("peer", peerInfo.ID), zap.Error(err))
}
}
}
func (w *Waku) connect(peerInfo peer.AddrInfo, origin wps.Origin) {
// Connection will be prunned eventually by the connection manager if needed
// The peer connector in go-waku uses Connect, so it will execute identify as part of its
w.node.AddDiscoveredPeer(peerInfo.ID, peerInfo.Addrs, origin, []string{w.cfg.DefaultShardPubsubTopic}, true)
}
func (w *Waku) telemetryBandwidthStats(telemetryServerURL string) {
@ -546,41 +508,32 @@ func (w *Waku) runPeerExchangeLoop() {
case <-ticker.C:
w.logger.Info("Running peer exchange loop")
availablePeers := w.node.Host().Peerstore().Peers()
peersWithRelay := 0
for _, p := range availablePeers {
supportedProtocols, err := w.node.Host().Peerstore().SupportsProtocols(p, relay.WakuRelayID_v200)
if err != nil {
continue
}
if len(supportedProtocols) != 0 {
peersWithRelay++
}
}
peersToDiscover := w.cfg.DiscoveryLimit - peersWithRelay
if peersToDiscover <= 0 {
continue
}
// We select only the nodes discovered via DNS Discovery that support peer exchange
// We assume that those peers are running peer exchange according to infra config,
// If not, the peer selection process in go-waku will filter them out anyway
w.dnsAddressCacheLock.RLock()
var peers []peer.ID
for _, record := range w.dnsAddressCache {
for _, discoveredNode := range record {
if len(discoveredNode.PeerInfo.Addrs) == 0 {
continue
}
go w.identifyAndConnect(w.ctx, true, discoveredNode.PeerInfo)
// Attempt to connect to the peers.
// Peers will be added to the libp2p peer store thanks to identify
go w.connect(discoveredNode.PeerInfo, wps.DNSDiscovery)
peers = append(peers, discoveredNode.PeerID)
}
}
w.dnsAddressCacheLock.RUnlock()
err := w.node.PeerExchange().Request(w.ctx, peersToDiscover, peer_exchange.WithAutomaticPeerSelection())
if len(peers) != 0 {
err := w.node.PeerExchange().Request(w.ctx, w.cfg.DiscoveryLimit, peer_exchange.WithAutomaticPeerSelection(peers...))
if err != nil {
w.logger.Error("couldnt request peers via peer exchange", zap.Error(err))
}
}
}
}
}
func (w *Waku) getPubsubTopic(topic string) string {
@ -1169,14 +1122,6 @@ func (w *Waku) Start() error {
w.logger.Info("WakuV2 PeerID", zap.Stringer("id", w.node.Host().ID()))
idService, err := identify.NewIDService(w.node.Host())
if err != nil {
return err
}
idService.Start()
w.identifyService = idService
if err = w.discoverAndConnectPeers(); err != nil {
return fmt.Errorf("failed to add wakuv2 peers: %v", err)
}
@ -1317,15 +1262,10 @@ func (w *Waku) Stop() error {
w.envelopeCache.Stop()
err := w.identifyService.Close()
if err != nil {
return err
}
w.node.Stop()
if w.protectedTopicStore != nil {
err = w.protectedTopicStore.Close()
err := w.protectedTopicStore.Close()
if err != nil {
return err
}