feat: implement relay connectivity loop (#642)

* feat: implement relay conenctivity loop

* chore: fix codeclimate issues

* Apply suggestions from code review

Co-authored-by: richΛrd <info@richardramos.me>

* chore:address review comments

---------

Co-authored-by: richΛrd <info@richardramos.me>
This commit is contained in:
Prem Chaitanya Prathi 2023-08-15 06:57:51 +05:30 committed by GitHub
parent 419adcb6a8
commit 06f027b1a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 203 additions and 97 deletions

View File

@ -243,8 +243,9 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
if err != nil {
w.log.Error("creating localnode", zap.Error(err))
}
//Initialize peer manager.
w.peermanager = peermanager.NewPeerManager(uint(w.opts.maxPeerConnections), w.log)
w.peermanager = peermanager.NewPeerManager(w.opts.maxPeerConnections, w.log)
maxOutPeers := int(w.peermanager.OutRelayPeersTarget)
// Setup peer connection strategy
@ -257,6 +258,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
if err != nil {
w.log.Error("creating peer connection strategy", zap.Error(err))
}
w.peermanager.SetPeerConnector(w.peerConnector)
if w.opts.enableDiscV5 {
err := w.mountDiscV5()

View File

@ -29,7 +29,8 @@ type PeerData struct {
ENR *enode.Node
}
// PeerConnectionStrategy is a utility to connect to peers, but only if we have not recently tried connecting to them already
// PeerConnectionStrategy is a utility to connect to peers,
// but only if we have not recently tried connecting to them already
type PeerConnectionStrategy struct {
sync.RWMutex
@ -54,12 +55,17 @@ type PeerConnectionStrategy struct {
logger *zap.Logger
}
// NewPeerConnectionStrategy creates a utility to connect to peers, but only if we have not recently tried connecting to them already.
// NewPeerConnectionStrategy creates a utility to connect to peers,
// but only if we have not recently tried connecting to them already.
//
// cacheSize is the size of a TwoQueueCache
// dialTimeout is how long we attempt to connect to a peer before giving up
// minPeers is the minimum number of peers that the node should have
// backoff describes the strategy used to decide how long to backoff after previously attempting to connect to a peer
func NewPeerConnectionStrategy(cacheSize int, maxOutPeers int, dialTimeout time.Duration, backoff backoff.BackoffFactory, logger *zap.Logger) (*PeerConnectionStrategy, error) {
func NewPeerConnectionStrategy(cacheSize int, maxOutPeers int,
dialTimeout time.Duration, backoff backoff.BackoffFactory,
logger *zap.Logger) (*PeerConnectionStrategy, error) {
cache, err := lru.New2Q(cacheSize)
if err != nil {
return nil, err
@ -109,16 +115,18 @@ func (c *PeerConnectionStrategy) consumeSubscription(ctx context.Context, ch <-c
}
// Sets the host to be able to mount or consume a protocol
// SetHost sets the host to be able to mount or consume a protocol
func (c *PeerConnectionStrategy) SetHost(h host.Host) {
c.host = h
}
// SetPeerManager sets the peermanager in order to utilize add peer
func (c *PeerConnectionStrategy) SetPeerManager(pm *PeerManager) {
c.pm = pm
}
// Start attempts to connect to the peers passed in by peerCh. Will not connect to peers if they are within the backoff period.
// Start attempts to connect to the peers passed in by peerCh.
// Will not connect to peers if they are within the backoff period.
func (c *PeerConnectionStrategy) Start(ctx context.Context) error {
if c.cancel != nil {
return errors.New("already started")
@ -139,6 +147,7 @@ func (c *PeerConnectionStrategy) Start(ctx context.Context) error {
return nil
}
// Stop terminates the peer-connector
func (c *PeerConnectionStrategy) Stop() {
if c.cancel == nil {
return
@ -176,9 +185,9 @@ func (c *PeerConnectionStrategy) shouldDialPeers(ctx context.Context) {
return
case <-ticker.C:
isPaused := c.isPaused()
_, outRelayPeers, err := c.host.Peerstore().(wps.WakuPeerstore).GroupPeersByDirection()
_, outRelayPeers, err := c.pm.GroupPeersByDirection()
if err != nil {
c.logger.Info("Failed to get outRelayPeers from peerstore", zap.Error(err))
c.logger.Warn("failed to get outRelayPeers from peerstore", zap.Error(err))
continue
}
numPeers := outRelayPeers.Len()
@ -245,6 +254,28 @@ func (c *PeerConnectionStrategy) workPublisher(ctx context.Context) {
const maxActiveDials = 5
func (c *PeerConnectionStrategy) canDialPeer(pi peer.AddrInfo) bool {
c.mux.Lock()
val, ok := c.cache.Get(pi.ID)
var cachedPeer *connCacheData
if ok {
tv := val.(*connCacheData)
now := time.Now()
if now.Before(tv.nextTry) {
c.mux.Unlock()
return false
}
tv.nextTry = now.Add(tv.strat.Delay())
} else {
cachedPeer = &connCacheData{strat: c.backoff()}
cachedPeer.nextTry = time.Now().Add(cachedPeer.strat.Delay())
c.cache.Add(pi.ID, cachedPeer)
}
c.mux.Unlock()
return true
}
func (c *PeerConnectionStrategy) dialPeers(ctx context.Context) {
defer c.wg.Done()
@ -262,51 +293,34 @@ func (c *PeerConnectionStrategy) dialPeers(ctx context.Context) {
return
}
if pi.ID == c.host.ID() || pi.ID == "" {
if pi.ID == c.host.ID() || pi.ID == "" ||
c.host.Network().Connectedness(pi.ID) == network.Connected {
continue
}
if c.host.Network().Connectedness(pi.ID) == network.Connected {
continue
}
c.mux.Lock()
val, ok := c.cache.Get(pi.ID)
var cachedPeer *connCacheData
if ok {
tv := val.(*connCacheData)
now := time.Now()
if now.Before(tv.nextTry) {
c.mux.Unlock()
continue
}
tv.nextTry = now.Add(tv.strat.Delay())
if c.canDialPeer(pi) {
sem <- struct{}{}
c.wg.Add(1)
go c.dialPeer(pi, sem)
} else {
cachedPeer = &connCacheData{strat: c.backoff()}
cachedPeer.nextTry = time.Now().Add(cachedPeer.strat.Delay())
c.cache.Add(pi.ID, cachedPeer)
continue
}
c.mux.Unlock()
sem <- struct{}{}
c.wg.Add(1)
go func(pi peer.AddrInfo) {
defer c.wg.Done()
c.RLock()
ctx, cancel := context.WithTimeout(c.workerCtx, c.dialTimeout)
c.RUnlock()
defer cancel()
err := c.host.Connect(ctx, pi)
if err != nil && !errors.Is(err, context.Canceled) {
c.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(pi)
c.logger.Info("connecting to peer", logging.HostID("peerID", pi.ID), zap.Error(err))
}
<-sem
}(pi)
case <-ctx.Done():
return
}
}
}
func (c *PeerConnectionStrategy) dialPeer(pi peer.AddrInfo, sem chan struct{}) {
defer c.wg.Done()
c.RLock()
ctx, cancel := context.WithTimeout(c.workerCtx, c.dialTimeout)
c.RUnlock()
defer cancel()
err := c.host.Connect(ctx, pi)
if err != nil && !errors.Is(err, context.Canceled) {
c.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(pi)
c.logger.Warn("connecting to peer", logging.HostID("peerID", pi.ID), zap.Error(err))
}
<-sem
}

View File

@ -5,6 +5,7 @@ import (
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/protocol"
@ -22,12 +23,15 @@ const WakuRelayIDv200 = protocol.ID("/vac/waku/relay/2.0.0")
// PeerManager applies various controls and manage connections towards peers.
type PeerManager struct {
maxRelayPeers uint
peerConnector *PeerConnectionStrategy
maxConnections int
maxRelayPeers int
logger *zap.Logger
InRelayPeersTarget uint
OutRelayPeersTarget uint
InRelayPeersTarget int
OutRelayPeersTarget int
host host.Host
serviceSlots map[protocol.ID][]peer.ID
ctx context.Context
}
const maxRelayPeersShare = 5
@ -35,78 +39,169 @@ const maxRelayPeersShare = 5
// const defaultMaxOutRelayPeersTarget = 10
const outRelayPeersShare = 3
const peerConnectivityLoopSecs = 15
const minOutRelayConns = 10
// NewPeerManager creates a new peerManager instance.
func NewPeerManager(maxConnections uint, logger *zap.Logger) *PeerManager {
func NewPeerManager(maxConnections int, logger *zap.Logger) *PeerManager {
maxRelayPeersValue := maxConnections - (maxConnections / maxRelayPeersShare)
outRelayPeersTargetValue := uint(maxRelayPeersValue / outRelayPeersShare)
outRelayPeersTargetValue := int(maxRelayPeersValue / outRelayPeersShare)
if outRelayPeersTargetValue < minOutRelayConns {
outRelayPeersTargetValue = minOutRelayConns
}
inRelayPeersTargetValue := maxRelayPeersValue - outRelayPeersTargetValue
if inRelayPeersTargetValue < 0 {
inRelayPeersTargetValue = 0
}
pm := &PeerManager{
maxConnections: maxConnections,
logger: logger.Named("peer-manager"),
maxRelayPeers: maxRelayPeersValue,
InRelayPeersTarget: maxRelayPeersValue - outRelayPeersTargetValue,
InRelayPeersTarget: inRelayPeersTargetValue,
OutRelayPeersTarget: outRelayPeersTargetValue,
serviceSlots: make(map[protocol.ID][]peer.ID),
}
logger.Info("PeerManager init values", zap.Uint("maxConnections", maxConnections),
zap.Uint("maxRelayPeersValue", maxRelayPeersValue), zap.Uint("outRelayPeersTargetValue", outRelayPeersTargetValue),
zap.Uint("inRelayPeersTarget", pm.InRelayPeersTarget))
logger.Info("PeerManager init values", zap.Int("maxConnections", maxConnections),
zap.Int("maxRelayPeersValue", maxRelayPeersValue),
zap.Int("outRelayPeersTargetValue", outRelayPeersTargetValue),
zap.Int("inRelayPeersTarget", pm.InRelayPeersTarget))
return pm
}
// SetHost sets the host to be used in order to access the peerStore.
func (pm *PeerManager) SetHost(host host.Host) {
pm.host = host
}
// SetPeerConnector sets the peer connector to be used for establishing relay connections.
func (pm *PeerManager) SetPeerConnector(pc *PeerConnectionStrategy) {
pm.peerConnector = pc
}
// Start starts the processing to be done by peer manager.
func (pm *PeerManager) Start(ctx context.Context) {
pm.ctx = ctx
go pm.connectivityLoop(ctx)
}
// This is a connectivity loop, which currently checks and prunes inbound connections.
func (pm *PeerManager) connectivityLoop(ctx context.Context) {
t := time.NewTicker(peerConnectivityLoopSecs * time.Second)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
pm.pruneInRelayConns()
pm.connectToRelayPeers()
}
}
}
func (pm *PeerManager) pruneInRelayConns() {
// GroupPeersByDirection returns all the connected peers in peer store grouped by Inbound or outBound direction
func (pm *PeerManager) GroupPeersByDirection() (inPeers peer.IDSlice, outPeers peer.IDSlice, err error) {
var inRelayPeers peer.IDSlice
for _, p := range pm.host.Network().Peers() {
direction, err := pm.host.Peerstore().(wps.WakuPeerstore).Direction(p)
if err == nil {
if direction == network.DirInbound {
inPeers = append(inPeers, p)
} else if direction == network.DirOutbound {
outPeers = append(outPeers, p)
}
} else {
pm.logger.Error("Failed to retrieve peer direction",
logging.HostID("peerID", p), zap.Error(err))
}
}
return inPeers, outPeers, nil
}
func (pm *PeerManager) getRelayPeers() (inRelayPeers peer.IDSlice, outRelayPeers peer.IDSlice) {
//Group peers by their connected direction inbound or outbound.
inPeers, outPeers, err := pm.host.Peerstore().(wps.WakuPeerstore).GroupPeersByDirection()
inPeers, outPeers, err := pm.GroupPeersByDirection()
if err != nil {
return
}
pm.logger.Info("Number of peers connected", zap.Int("inPeers", inPeers.Len()), zap.Int("outPeers", outPeers.Len()))
pm.logger.Info("Number of peers connected", zap.Int("inPeers", inPeers.Len()),
zap.Int("outPeers", outPeers.Len()))
//Need to filter peers to check if they support relay
inRelayPeers, _ = utils.FilterPeersByProto(pm.host, inPeers, WakuRelayIDv200)
outRelayPeers, _ := utils.FilterPeersByProto(pm.host, outPeers, WakuRelayIDv200)
pm.logger.Info("Number of Relay peers connected", zap.Int("inRelayPeers", inRelayPeers.Len()), zap.Int("outRelayPeers", outRelayPeers.Len()))
outRelayPeers, _ = utils.FilterPeersByProto(pm.host, outPeers, WakuRelayIDv200)
pm.logger.Info("Number of Relay peers connected", zap.Int("inRelayPeers", inRelayPeers.Len()),
zap.Int("outRelayPeers", outRelayPeers.Len()))
return
}
if inRelayPeers.Len() > int(pm.InRelayPeersTarget) {
//Start disconnecting peers, based on what?
//For now, just disconnect most recently connected peers
//TODO: Need to have more intelligent way of doing this, maybe peer scores.
pm.logger.Info("Number of in peer connections exceed targer relay peers, hence pruning", zap.Int("inRelayPeers", inRelayPeers.Len()), zap.Uint("inRelayPeersTarget", pm.InRelayPeersTarget))
for pruningStartIndex := pm.InRelayPeersTarget; pruningStartIndex < uint(inRelayPeers.Len()); pruningStartIndex++ {
p := inRelayPeers[pruningStartIndex]
err := pm.host.Network().ClosePeer(p)
if err != nil {
pm.logger.Warn("Failed to disconnect connection towards peer", zap.String("peerID", p.String()))
}
pm.host.Peerstore().RemovePeer(p) //TODO: Should we remove the peer immediately?
pm.logger.Info("Successfully disconnected connection towards peer", zap.String("peerID", p.String()))
func (pm *PeerManager) connectToRelayPeers() {
//Check for out peer connections and connect to more peers.
inRelayPeers, outRelayPeers := pm.getRelayPeers()
if inRelayPeers.Len() > 0 &&
inRelayPeers.Len() > pm.InRelayPeersTarget {
pm.pruneInRelayConns(inRelayPeers, outRelayPeers)
}
if outRelayPeers.Len() > pm.OutRelayPeersTarget {
return
}
totalRelayPeers := inRelayPeers.Len() + outRelayPeers.Len()
// Establish additional connections if there are peers.
//What if the not connected peers in peerstore are not relay peers???
if totalRelayPeers < pm.host.Peerstore().Peers().Len() {
//Find not connected peers.
notConnectedPeers := pm.getNotConnectedPers()
//Figure out outside backoff peers.
//Connect to eligible peers.
numPeersToConnect := pm.maxRelayPeers - totalRelayPeers
if numPeersToConnect > notConnectedPeers.Len() {
numPeersToConnect = notConnectedPeers.Len() - 1
}
pm.connectToPeers(notConnectedPeers[0:numPeersToConnect])
} //Else: Should we raise some sort of unhealthy event??
}
func (pm *PeerManager) connectToPeers(peers peer.IDSlice) {
for _, peerID := range peers {
peerInfo := peer.AddrInfo{
ID: peerID,
Addrs: pm.host.Peerstore().Addrs(peerID),
}
pm.peerConnector.publishWork(pm.ctx, peerInfo)
}
}
func (pm *PeerManager) getNotConnectedPers() (notConnectedPeers peer.IDSlice) {
for _, peerID := range pm.host.Peerstore().Peers() {
if pm.host.Network().Connectedness(peerID) != network.Connected {
notConnectedPeers = append(notConnectedPeers, peerID)
}
}
return
}
func (pm *PeerManager) pruneInRelayConns(inRelayPeers peer.IDSlice, outRelayPeers peer.IDSlice) {
//Start disconnecting peers, based on what?
//For now, just disconnect most recently connected peers
//TODO: Need to have more intelligent way of doing this, maybe peer scores.
pm.logger.Info("Number of in peer connections exceed targer relay peers, hence pruning",
zap.Int("inRelayPeers", inRelayPeers.Len()), zap.Int("inRelayPeersTarget", pm.InRelayPeersTarget))
for pruningStartIndex := pm.InRelayPeersTarget; pruningStartIndex < inRelayPeers.Len(); pruningStartIndex++ {
p := inRelayPeers[pruningStartIndex]
err := pm.host.Network().ClosePeer(p)
if err != nil {
pm.logger.Warn("Failed to disconnect connection towards peer",
logging.HostID("peerID", p))
}
pm.logger.Debug("Successfully disconnected connection towards peer",
logging.HostID("peerID", p))
}
}
@ -120,7 +215,8 @@ func (pm *PeerManager) AddDiscoveredPeer(p PeerData) {
if p.ENR != nil {
err := pm.host.Peerstore().(wps.WakuPeerstore).SetENR(p.AddrInfo.ID, p.ENR)
if err != nil {
pm.logger.Error("could not store enr", zap.Error(err), logging.HostID("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String()))
pm.logger.Error("could not store enr", zap.Error(err),
logging.HostID("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String()))
}
}
}
@ -193,7 +289,8 @@ func (pm *PeerManager) AddPeerToServiceSlot(proto protocol.ID, peerID peer.ID, o
//For now adding the peer to serviceSlot which means the latest added peer would be given priority.
//TODO: Ideally we should sort the peers per service and return best peer based on peer score or RTT etc.
pm.logger.Info("Adding peer to service slots", logging.HostID("peer", peerID), zap.String("service", string(proto)))
pm.logger.Info("Adding peer to service slots", logging.HostID("peer", peerID),
zap.String("service", string(proto)))
pm.serviceSlots[proto] = append(pm.serviceSlots[proto], peerID)
}
@ -219,8 +316,7 @@ func (pm *PeerManager) SelectPeer(proto protocol.ID, specificPeers []peer.ID, lo
//Try to fetch from serviceSlot
peerIDs, ok := pm.serviceSlots[proto]
if ok || len(peerIDs) > 0 {
pm.logger.Info("Got peer from service slots", logging.HostID("peer", peerIDs[0]))
return peerIDs[0], nil
filteredPeers = peerIDs
}
return utils.SelectRandomPeer(filteredPeers, pm.logger)

View File

@ -49,6 +49,12 @@ func TestServiceSlots(t *testing.T) {
peerId, err = pm.SelectPeer(protocol, nil, utils.Logger())
require.NoError(t, err)
if peerId == h2.ID() || peerId == h1.ID() {
//Test success
t.Log("Random peer selection per protocol successful")
} else {
t.FailNow()
}
require.Equal(t, peerId, h2.ID())
h1.Peerstore().AddAddrs(h3.ID(), h3.Network().ListenAddresses(), peerstore.PermanentAddrTTL)
@ -64,7 +70,12 @@ func TestServiceSlots(t *testing.T) {
//Test peer selection from first added peer to serviceSlot
peerId, err = pm.SelectPeer(protocol, nil, utils.Logger())
require.NoError(t, err)
require.Equal(t, peerId, h2.ID())
if peerId == h2.ID() || peerId == h3.ID() {
//Test success
t.Log("Random peer selection per protocol successful")
} else {
t.FailNow()
}
//Test peer selection for specific protocol
peerId, err = pm.SelectPeer(protocol1, nil, utils.Logger())

View File

@ -51,7 +51,6 @@ type WakuPeerstore interface {
SetDirection(p peer.ID, direction network.Direction) error
Direction(p peer.ID) (network.Direction, error)
GroupPeersByDirection() (inPeers peer.IDSlice, outPeers peer.IDSlice, err error)
}
// NewWakuPeerstore creates a new WakuPeerStore object
@ -140,19 +139,3 @@ func (ps *WakuPeerstoreImpl) Direction(p peer.ID) (network.Direction, error) {
return result.(network.Direction), nil
}
// GroupPeersByDirection returns all the peers in peer store grouped by Inbound or outBound direction
func (ps *WakuPeerstoreImpl) GroupPeersByDirection() (inPeers peer.IDSlice, outPeers peer.IDSlice, err error) {
for _, p := range ps.Peers() {
direction, err := ps.Direction(p)
if err == nil {
if direction == network.DirInbound {
inPeers = append(inPeers, p)
} else if direction == network.DirOutbound {
outPeers = append(outPeers, p)
}
}
}
return inPeers, outPeers, nil
}