From bc16421c7467067518335e378a39b6d66ff5ca02 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?rich=CE=9Brd?= Date: Mon, 13 May 2024 14:56:34 -0400 Subject: [PATCH] feat: ping cache (#999) --- go.sum | 1 - waku/v2/node/wakuoptions.go | 2 - waku/v2/peermanager/fastest_peer_selector.go | 172 ++++++++++++++++++ .../peermanager/fastest_peer_selector_test.go | 48 +++++ waku/v2/peermanager/peer_manager.go | 4 +- waku/v2/peermanager/peer_selection.go | 57 +----- .../protocol/lightpush/waku_lightpush_test.go | 3 +- 7 files changed, 226 insertions(+), 61 deletions(-) create mode 100644 waku/v2/peermanager/fastest_peer_selector.go create mode 100644 waku/v2/peermanager/fastest_peer_selector_test.go diff --git a/go.sum b/go.sum index b7ee60f9..946fc732 100644 --- a/go.sum +++ b/go.sum @@ -564,7 +564,6 @@ github.com/go-latex/latex v0.0.0-20210118124228-b3d85cf34e07/go.mod h1:CO1AlKB2C github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= -github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas= github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU= github.com/go-logr/logr v0.4.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU= diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 60608f77..90f67f06 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -12,7 +12,6 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p" - mplex "github.com/libp2p/go-libp2p-mplex" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/config" "github.com/libp2p/go-libp2p/core/crypto" @@ -559,7 +558,6 @@ var DefaultLibP2POptions = []libp2p.Option{ libp2p.UserAgent(UserAgent), libp2p.ChainOptions( libp2p.Muxer("/yamux/1.0.0", yamux.DefaultTransport), - libp2p.Muxer("/mplex/6.7.0", mplex.DefaultTransport), ), libp2p.EnableNATService(), libp2p.ConnectionManager(newConnManager(200, 300, connmgr.WithGracePeriod(0))), diff --git a/waku/v2/peermanager/fastest_peer_selector.go b/waku/v2/peermanager/fastest_peer_selector.go new file mode 100644 index 00000000..a7318238 --- /dev/null +++ b/waku/v2/peermanager/fastest_peer_selector.go @@ -0,0 +1,172 @@ +package peermanager + +import ( + "context" + "errors" + "sort" + "sync" + "time" + + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/p2p/protocol/ping" + "github.com/waku-org/go-waku/logging" + "go.uber.org/zap" +) + +type FastestPeerSelector struct { + sync.RWMutex + + host host.Host + + logger *zap.Logger +} + +func NewFastestPeerSelector(logger *zap.Logger) *FastestPeerSelector { + return &FastestPeerSelector{ + logger: logger.Named("rtt-cache"), + } +} + +func (r *FastestPeerSelector) SetHost(h host.Host) { + r.host = h +} + +func (r *FastestPeerSelector) PingPeer(ctx context.Context, peer peer.ID) (time.Duration, error) { + if peer == r.host.ID() { + return 0, errors.New("can't ping yourself") + } + + ctx, cancel := context.WithTimeout(ctx, 7*time.Second) + defer cancel() + + select { + case <-ctx.Done(): + return 0, ctx.Err() + + case result := <-ping.Ping(ctx, r.host, peer): + r.Lock() + defer r.Unlock() + + if result.Error == nil { + return result.RTT, nil + } else { + r.logger.Debug("could not ping", logging.HostID("peer", peer), zap.Error(result.Error)) + return 0, result.Error + } + } + +} + +func (r *FastestPeerSelector) FastestPeer(ctx context.Context, peers peer.IDSlice) (peer.ID, error) { + var peerRTT []pingResult + var peerRTTMutex sync.Mutex + + wg := sync.WaitGroup{} + pingCh := make(chan peer.ID) + + pinged := make(map[peer.ID]struct{}) + + go func() { + // Ping any peer with no latency recorded + for peerToPing := range pingCh { + go func(p peer.ID) { + defer wg.Done() + rtt := time.Hour + result, err := r.PingPeer(ctx, p) + if err == nil { + rtt = result + } + + peerRTTMutex.Lock() + peerRTT = append(peerRTT, pingResult{ + peerID: p, + rtt: rtt, + connectedness: r.host.Network().Connectedness(p), + }) + peerRTTMutex.Unlock() + }(peerToPing) + } + }() + + for _, p := range peers { + latency := r.host.Peerstore().LatencyEWMA(p) + if latency == 0 { + wg.Add(1) + pinged[p] = struct{}{} // To avoid double pings + pingCh <- p + } else { + peerRTTMutex.Lock() + peerRTT = append(peerRTT, pingResult{ + peerID: p, + rtt: latency, + connectedness: r.host.Network().Connectedness(p), + }) + peerRTTMutex.Unlock() + } + } + + // Wait for pings to be done (if any) + wg.Wait() + close(pingCh) + + sort.Sort(pingSort(peerRTT)) + + for _, p := range peerRTT { + if p.rtt == time.Hour { + break + } + + // Make sure peer is reachable + _, exists := pinged[p.peerID] // Did we just ping the peer? + if !exists { + _, err := r.PingPeer(ctx, p.peerID) + if err != nil { + continue + } else { + if p.rtt != time.Hour { + return p.peerID, nil + } + } + } else { + if p.rtt != time.Hour { + return p.peerID, nil + } + } + } + + return "", ErrNoPeersAvailable +} + +type pingResult struct { + peerID peer.ID + rtt time.Duration + connectedness network.Connectedness +} + +type pingSort []pingResult + +func (a pingSort) Len() int { + return len(a) +} + +func (a pingSort) Swap(i, j int) { + a[i], a[j] = a[j], a[i] +} + +var connectednessPriority map[network.Connectedness]int + +func init() { + // Closer to 0 is prefered + connectednessPriority = map[network.Connectedness]int{ + network.CanConnect: 1, + network.Connected: 1, + network.NotConnected: 2, + network.CannotConnect: 3, + } +} + +func (a pingSort) Less(i, j int) bool { + return connectednessPriority[a[i].connectedness] < connectednessPriority[a[j].connectedness] && a[i].rtt < a[j].rtt +} diff --git a/waku/v2/peermanager/fastest_peer_selector_test.go b/waku/v2/peermanager/fastest_peer_selector_test.go new file mode 100644 index 00000000..6576d11c --- /dev/null +++ b/waku/v2/peermanager/fastest_peer_selector_test.go @@ -0,0 +1,48 @@ +package peermanager + +import ( + "context" + "crypto/rand" + "testing" + "time" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/tests" + "github.com/waku-org/go-waku/waku/v2/utils" +) + +func TestRTT(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + h1, _ := tests.MakeHost(ctx, 0, rand.Reader) + h2, _ := tests.MakeHost(ctx, 0, rand.Reader) + h3, _ := tests.MakeHost(ctx, 0, rand.Reader) + + h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.PermanentAddrTTL) + h1.Peerstore().AddAddrs(h3.ID(), h3.Addrs(), peerstore.PermanentAddrTTL) + + rtt := NewFastestPeerSelector(utils.Logger()) + rtt.SetHost(h1) + + _, err := rtt.FastestPeer(ctx, peer.IDSlice{h2.ID(), h3.ID()}) + require.NoError(t, err) + + // Simulate H3 being no longer available + h3.Close() + + _, err = rtt.FastestPeer(ctx, peer.IDSlice{h3.ID()}) + require.ErrorIs(t, err, ErrNoPeersAvailable) + + // H3 should never return + for i := 0; i < 100; i++ { + p, err := rtt.FastestPeer(ctx, peer.IDSlice{h2.ID(), h3.ID()}) + if err != nil { + require.ErrorIs(t, err, ErrNoPeersAvailable) + } else { + require.NotEqual(t, h3.ID(), p) + } + } +} diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index d1af8ce3..b4ec439d 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -84,6 +84,7 @@ type PeerManager struct { discoveryService *discv5.DiscoveryV5 wakuprotoToENRFieldMap map[protocol.ID]WakuProtoInfo TopicHealthNotifCh chan<- TopicHealthStatus + rttCache *FastestPeerSelector } // PeerSelection provides various options based on which Peer is selected from a list of peers. @@ -188,6 +189,7 @@ func NewPeerManager(maxConnections int, maxPeers int, metadata *metadata.WakuMet subRelayTopics: make(map[string]*NodeTopicDetails), maxPeers: maxPeers, wakuprotoToENRFieldMap: map[protocol.ID]WakuProtoInfo{}, + rttCache: NewFastestPeerSelector(logger), } logger.Info("PeerManager init values", zap.Int("maxConnections", maxConnections), zap.Int("maxRelayPeers", maxRelayPeers), @@ -206,6 +208,7 @@ func (pm *PeerManager) SetDiscv5(discv5 *discv5.DiscoveryV5) { // SetHost sets the host to be used in order to access the peerStore. func (pm *PeerManager) SetHost(host host.Host) { pm.host = host + pm.rttCache.SetHost(host) } // SetPeerConnector sets the peer connector to be used for establishing relay connections. @@ -215,7 +218,6 @@ func (pm *PeerManager) SetPeerConnector(pc *PeerConnectionStrategy) { // Start starts the processing to be done by peer manager. func (pm *PeerManager) Start(ctx context.Context) { - pm.RegisterWakuProtocol(relay.WakuRelayID_v200, relay.WakuRelayENRField) pm.ctx = ctx diff --git a/waku/v2/peermanager/peer_selection.go b/waku/v2/peermanager/peer_selection.go index 2b4c3807..4c1268bb 100644 --- a/waku/v2/peermanager/peer_selection.go +++ b/waku/v2/peermanager/peer_selection.go @@ -3,13 +3,9 @@ package peermanager import ( "context" "errors" - "sync" - "time" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" - "github.com/libp2p/go-libp2p/p2p/protocol/ping" - "github.com/waku-org/go-waku/logging" wps "github.com/waku-org/go-waku/waku/v2/peerstore" waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" "go.uber.org/zap" @@ -178,11 +174,6 @@ func (pm *PeerManager) SelectPeers(criteria PeerSelectionCriteria) (peer.IDSlice } } -type pingResult struct { - p peer.ID - rtt time.Duration -} - // SelectPeerWithLowestRTT will select a peer that supports a specific protocol with the lowest reply time // If a list of specific peers is passed, the peer will be chosen from that list assuming // it supports the chosen protocol, otherwise it will chose a peer from the node peerstore @@ -204,54 +195,8 @@ func (pm *PeerManager) SelectPeerWithLowestRTT(criteria PeerSelectionCriteria) ( if err != nil { return "", err } - wg := sync.WaitGroup{} - waitCh := make(chan struct{}) - pingCh := make(chan pingResult, 1000) - wg.Add(len(peers)) - - go func() { - for _, p := range peers { - go func(p peer.ID) { - defer wg.Done() - ctx, cancel := context.WithTimeout(criteria.Ctx, 3*time.Second) - defer cancel() - result := <-ping.Ping(ctx, pm.host, p) - if result.Error == nil { - pingCh <- pingResult{ - p: p, - rtt: result.RTT, - } - } else { - pm.logger.Debug("could not ping", logging.HostID("peer", p), zap.Error(result.Error)) - } - }(p) - } - wg.Wait() - close(waitCh) - close(pingCh) - }() - - select { - case <-waitCh: - var min *pingResult - for p := range pingCh { - if min == nil { - min = &p - } else { - if p.rtt < min.rtt { - min = &p - } - } - } - if min == nil { - return "", ErrNoPeersAvailable - } - - return min.p, nil - case <-criteria.Ctx.Done(): - return "", ErrNoPeersAvailable - } + return pm.rttCache.FastestPeer(criteria.Ctx, peers) } // FilterPeersByProto filters list of peers that support specified protocols. diff --git a/waku/v2/protocol/lightpush/waku_lightpush_test.go b/waku/v2/protocol/lightpush/waku_lightpush_test.go index 189c21c8..8b3d93bb 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_test.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_test.go @@ -7,12 +7,13 @@ import ( "testing" "time" + "github.com/waku-org/go-waku/waku/v2/peermanager" + "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peerstore" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" - "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/timesource"