diff --git a/.gitignore b/.gitignore index 03bb1407..c1161ea7 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +.codeclimate.yml nodekey rlnKeystore.json test_onchain.json diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index bb198db3..f3d315b4 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -24,47 +24,49 @@ const WakuRelayIDv200 = protocol.ID("/vac/waku/relay/2.0.0") // PeerManager applies various controls and manage connections towards peers. type PeerManager struct { peerConnector *PeerConnectionStrategy - maxConnections int maxRelayPeers int logger *zap.Logger InRelayPeersTarget int OutRelayPeersTarget int host host.Host - serviceSlots map[protocol.ID][]peer.ID + serviceSlots *ServiceSlots ctx context.Context } -const maxRelayPeersShare = 5 - -// const defaultMaxOutRelayPeersTarget = 10 -const outRelayPeersShare = 3 const peerConnectivityLoopSecs = 15 -const minOutRelayConns = 10 + +// 80% relay peers 20% service peers +func relayAndServicePeers(maxConnections int) (int, int) { + return maxConnections - maxConnections/5, maxConnections / 5 +} + +// 66% inRelayPeers 33% outRelayPeers +func inAndOutRelayPeers(relayPeers int) (int, int) { + outRelayPeers := relayPeers / 3 + // + const minOutRelayConns = 10 + if outRelayPeers < minOutRelayConns { + outRelayPeers = minOutRelayConns + } + return relayPeers - outRelayPeers, outRelayPeers +} // NewPeerManager creates a new peerManager instance. func NewPeerManager(maxConnections int, logger *zap.Logger) *PeerManager { - maxRelayPeersValue := maxConnections - (maxConnections / maxRelayPeersShare) - outRelayPeersTargetValue := int(maxRelayPeersValue / outRelayPeersShare) - if outRelayPeersTargetValue < minOutRelayConns { - outRelayPeersTargetValue = minOutRelayConns - } - inRelayPeersTargetValue := maxRelayPeersValue - outRelayPeersTargetValue - if inRelayPeersTargetValue < 0 { - inRelayPeersTargetValue = 0 - } + maxRelayPeers, _ := relayAndServicePeers(maxConnections) + inRelayPeersTarget, outRelayPeersTarget := inAndOutRelayPeers(maxRelayPeers) pm := &PeerManager{ - maxConnections: maxConnections, logger: logger.Named("peer-manager"), - maxRelayPeers: maxRelayPeersValue, - InRelayPeersTarget: inRelayPeersTargetValue, - OutRelayPeersTarget: outRelayPeersTargetValue, - serviceSlots: make(map[protocol.ID][]peer.ID), + maxRelayPeers: maxRelayPeers, + InRelayPeersTarget: inRelayPeersTarget, + OutRelayPeersTarget: outRelayPeersTarget, + serviceSlots: NewServiceSlot(), } logger.Info("PeerManager init values", zap.Int("maxConnections", maxConnections), - zap.Int("maxRelayPeersValue", maxRelayPeersValue), - zap.Int("outRelayPeersTargetValue", outRelayPeersTargetValue), + zap.Int("maxRelayPeers", maxRelayPeers), + zap.Int("outRelayPeersTarget", outRelayPeersTarget), zap.Int("inRelayPeersTarget", pm.InRelayPeersTarget)) return pm @@ -274,13 +276,7 @@ func (pm *PeerManager) RemovePeer(peerID peer.ID) { pm.host.Peerstore().RemovePeer(peerID) //Search if this peer is in serviceSlot and if so, remove it from there // TODO:Add another peer which is statically configured to the serviceSlot. - for proto, peers := range pm.serviceSlots { - for i, peer := range peers { - if peer == peerID { - pm.serviceSlots[proto][i] = "" - } - } - } + pm.serviceSlots.removePeer(peerID) } // AddPeerToServiceSlot adds a peerID to serviceSlot. @@ -296,7 +292,8 @@ func (pm *PeerManager) AddPeerToServiceSlot(proto protocol.ID, peerID peer.ID) { //TODO: Ideally we should sort the peers per service and return best peer based on peer score or RTT etc. pm.logger.Info("Adding peer to service slots", logging.HostID("peer", peerID), zap.String("service", string(proto))) - pm.serviceSlots[proto] = append(pm.serviceSlots[proto], peerID) + // getPeers returns nil for WakuRelayIDv200 protocol, but we don't run this ServiceSlot code for WakuRelayIDv200 protocol + pm.serviceSlots.getPeers(proto).add(peerID) } // SelectPeer is used to return a random peer that supports a given protocol. @@ -310,19 +307,17 @@ func (pm *PeerManager) SelectPeer(proto protocol.ID, specificPeers []peer.ID, lo // - which topics they track // - latency? + //Try to fetch from serviceSlot + if slot := pm.serviceSlots.getPeers(proto); slot != nil { + if peerID, err := slot.getRandom(); err == nil { + return peerID, nil + } + } + + // if not found in serviceSlots or proto == WakuRelayIDv200 filteredPeers, err := utils.FilterPeersByProto(pm.host, specificPeers, proto) if err != nil { return "", err } - if proto == WakuRelayIDv200 { - return utils.SelectRandomPeer(filteredPeers, pm.logger) - } - - //Try to fetch from serviceSlot - peerIDs, ok := pm.serviceSlots[proto] - if ok || len(peerIDs) > 0 { - filteredPeers = peerIDs - } - return utils.SelectRandomPeer(filteredPeers, pm.logger) } diff --git a/waku/v2/peermanager/service_slot.go b/waku/v2/peermanager/service_slot.go new file mode 100644 index 00000000..aff2d856 --- /dev/null +++ b/waku/v2/peermanager/service_slot.go @@ -0,0 +1,78 @@ +package peermanager + +import ( + "sync" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" + "github.com/waku-org/go-waku/waku/v2/utils" +) + +type peerMap struct { + mu sync.RWMutex + m map[peer.ID]struct{} +} + +func newPeerMap() *peerMap { + return &peerMap{ + m: map[peer.ID]struct{}{}, + } +} + +func (pm *peerMap) getRandom() (peer.ID, error) { + pm.mu.RLock() + defer pm.mu.RUnlock() + for pID := range pm.m { + return pID, nil + } + return "", utils.ErrNoPeersAvailable + +} + +func (pm *peerMap) remove(pID peer.ID) { + pm.mu.Lock() + defer pm.mu.Unlock() + + delete(pm.m, pID) +} +func (pm *peerMap) add(pID peer.ID) { + pm.mu.Lock() + defer pm.mu.Unlock() + pm.m[pID] = struct{}{} +} + +// ServiceSlots is for storing service slots for a given protocol topic +type ServiceSlots struct { + mu sync.Mutex + m map[protocol.ID]*peerMap +} + +// NewServiceSlot is a constructor for ServiceSlot +func NewServiceSlot() *ServiceSlots { + return &ServiceSlots{ + m: map[protocol.ID]*peerMap{}, + } +} + +// getPeers for getting all the peers for a given protocol +// since peerMap is only used in peerManager that's why it is unexported +func (slots *ServiceSlots) getPeers(proto protocol.ID) *peerMap { + if proto == WakuRelayIDv200 { + return nil + } + slots.mu.Lock() + defer slots.mu.Unlock() + if slots.m[proto] == nil { + slots.m[proto] = newPeerMap() + } + return slots.m[proto] +} + +// RemovePeer for removing peer ID for a given protocol +func (slots *ServiceSlots) removePeer(peerID peer.ID) { + slots.mu.Lock() + defer slots.mu.Unlock() + for _, m := range slots.m { + m.remove(peerID) + } +} diff --git a/waku/v2/peermanager/service_slot_test.go b/waku/v2/peermanager/service_slot_test.go new file mode 100644 index 00000000..6d132e63 --- /dev/null +++ b/waku/v2/peermanager/service_slot_test.go @@ -0,0 +1,56 @@ +package peermanager + +import ( + "testing" + + "github.com/libp2p/go-libp2p/core/peer" + libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" + "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/waku/v2/utils" +) + +func TestServiceSlot(t *testing.T) { + slots := NewServiceSlot() + + protocol := libp2pProtocol.ID("test/protocol") + + peerId := peer.ID("peerId") + + // + slots.getPeers(protocol).add(peerId) + // + fetchedPeer, err := slots.getPeers(protocol).getRandom() + require.NoError(t, err) + require.Equal(t, peerId, fetchedPeer) + + // + slots.getPeers(protocol).remove(peerId) + // + _, err = slots.getPeers(protocol).getRandom() + require.Equal(t, err, utils.ErrNoPeersAvailable) +} + +func TestServiceSlotRemovePeerFromAll(t *testing.T) { + slots := NewServiceSlot() + + protocol := libp2pProtocol.ID("test/protocol") + protocol1 := libp2pProtocol.ID("test/protocol1") + + peerId := peer.ID("peerId") + + // + slots.getPeers(protocol).add(peerId) + slots.getPeers(protocol1).add(peerId) + // + fetchedPeer, err := slots.getPeers(protocol1).getRandom() + require.NoError(t, err) + require.Equal(t, peerId, fetchedPeer) + + // + slots.removePeer(peerId) + // + _, err = slots.getPeers(protocol).getRandom() + require.Equal(t, err, utils.ErrNoPeersAvailable) + _, err = slots.getPeers(protocol1).getRandom() + require.Equal(t, err, utils.ErrNoPeersAvailable) +}