fix: allow peer-selection without pubsubTopics

This commit is contained in:
Prem Chaitanya Prathi 2024-05-25 06:12:12 +05:30
parent 8115ec7013
commit 827dc3822d
No known key found for this signature in database
2 changed files with 18 additions and 19 deletions

View File

@ -130,7 +130,7 @@ func (c *PeerConnectionStrategy) consumeSubscription(s subscription) {
if len(c.host.Network().Peers()) < waku_proto.GossipSubDMin { if len(c.host.Network().Peers()) < waku_proto.GossipSubDMin {
triggerImmediateConnection = true triggerImmediateConnection = true
} }
c.logger.Debug("adding discovered peer", logging.HostID("peerID", p.AddrInfo.ID)) c.logger.Debug("adding discovered peer", logging.HostID("peerID", p.AddrInfo.ID), zap.Int64("origin", int64(p.Origin)))
c.pm.AddDiscoveredPeer(p, triggerImmediateConnection) c.pm.AddDiscoveredPeer(p, triggerImmediateConnection)
case <-time.After(1 * time.Second): case <-time.After(1 * time.Second):

View File

@ -67,7 +67,6 @@ func (pm *PeerManager) SelectRandom(criteria PeerSelectionCriteria) (peer.IDSlic
peerIDs = make(PeerSet) peerIDs = make(PeerSet)
} }
// if not found in serviceSlots or proto == WakuRelayIDv200 // if not found in serviceSlots or proto == WakuRelayIDv200
pm.logger.Debug("looking for peers in peerStore", zap.String("proto", string(criteria.Proto)))
filteredPeers, err := pm.FilterPeersByProto(criteria.SpecificPeers, criteria.ExcludePeers, criteria.Proto) filteredPeers, err := pm.FilterPeersByProto(criteria.SpecificPeers, criteria.ExcludePeers, criteria.Proto)
if err != nil { if err != nil {
return nil, err return nil, err
@ -130,14 +129,16 @@ func (pm *PeerManager) selectServicePeer(criteria PeerSelectionCriteria) (PeerSe
if len(criteria.PubsubTopics) == 0 || (len(criteria.PubsubTopics) == 1 && criteria.PubsubTopics[0] == "") { if len(criteria.PubsubTopics) == 0 || (len(criteria.PubsubTopics) == 1 && criteria.PubsubTopics[0] == "") {
return slot.getRandom(criteria.MaxPeers, criteria.ExcludePeers) return slot.getRandom(criteria.MaxPeers, criteria.ExcludePeers)
} else { //PubsubTopic based selection } else { //PubsubTopic based selection
keys := make([]peer.ID, 0, len(slot.m)) selectedPeers := make([]peer.ID, 0, len(slot.m))
for i := range slot.m { for i := range slot.m {
if PeerInSet(criteria.ExcludePeers, i) { if PeerInSet(criteria.ExcludePeers, i) {
continue continue
} }
keys = append(keys, i) selectedPeers = append(selectedPeers, i)
}
if len(criteria.PubsubTopics) > 0 {
selectedPeers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, selectedPeers...)
} }
selectedPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, keys...)
tmpPeers, err := selectRandomPeers(selectedPeers, criteria.ExcludePeers, criteria.MaxPeers) tmpPeers, err := selectRandomPeers(selectedPeers, criteria.ExcludePeers, criteria.MaxPeers)
for tmpPeer := range tmpPeers { for tmpPeer := range tmpPeers {
peers[tmpPeer] = struct{}{} peers[tmpPeer] = struct{}{}
@ -145,12 +146,16 @@ func (pm *PeerManager) selectServicePeer(criteria PeerSelectionCriteria) (PeerSe
if err == nil && len(peers) == criteria.MaxPeers { if err == nil && len(peers) == criteria.MaxPeers {
return peers, nil return peers, nil
} else { } else {
pm.logger.Debug("discovering peers by pubsubTopic", zap.Strings("pubsubTopics", criteria.PubsubTopics)) if len(criteria.PubsubTopics) > 0 {
//Trigger on-demand discovery for this topic and connect to peer immediately.
//For now discover atleast 1 peer for the criteria pm.logger.Debug("discovering peers by pubsubTopic", zap.Strings("pubsubTopics", criteria.PubsubTopics))
pm.discoverPeersByPubsubTopics(criteria.PubsubTopics, criteria.Proto, criteria.Ctx, 1) //Trigger on-demand discovery for this topic and connect to peer immediately.
//Try to fetch peers again. //For now discover atleast 1 peer for the criteria
continue pm.discoverPeersByPubsubTopics(criteria.PubsubTopics, criteria.Proto, criteria.Ctx, 1)
//Try to fetch peers again.
continue
}
break
} }
} }
} }
@ -186,12 +191,7 @@ func (pm *PeerManager) SelectPeers(criteria PeerSelectionCriteria) (peer.IDSlice
if criteria.MaxPeers == 0 { if criteria.MaxPeers == 0 {
criteria.MaxPeers = 1 criteria.MaxPeers = 1
} }
excPeers := maps.Keys(criteria.ExcludePeers)
var excPeer peer.ID
if len(excPeers) > 0 {
excPeer = excPeers[0]
}
pm.logger.Debug("Select Peers", zap.Stringer("selectionCriteria", criteria), zap.Stringer("excludedPeers", excPeer))
switch criteria.SelectionType { switch criteria.SelectionType {
case Automatic: case Automatic:
return pm.SelectRandom(criteria) return pm.SelectRandom(criteria)
@ -220,7 +220,7 @@ func (pm *PeerManager) SelectPeerWithLowestRTT(criteria PeerSelectionCriteria) (
criteria.Ctx = context.Background() criteria.Ctx = context.Background()
} }
if len(criteria.PubsubTopics) == 0 || (len(criteria.PubsubTopics) == 1 && criteria.PubsubTopics[0] == "") { if len(criteria.PubsubTopics) > 0 {
peers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, criteria.SpecificPeers...) peers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, criteria.SpecificPeers...)
} }
@ -253,6 +253,5 @@ func (pm *PeerManager) FilterPeersByProto(specificPeers peer.IDSlice, excludePee
peers = append(peers, peer) peers = append(peers, peer)
} }
} }
pm.logger.Debug("peers selected", zap.Int("peerCnt", len(peers)))
return peers, nil return peers, nil
} }