From c890b1fee8146999a1f8c456b3542ed93f17dd89 Mon Sep 17 00:00:00 2001 From: harsh-98 Date: Thu, 4 May 2023 16:56:27 +0530 Subject: [PATCH] refactor(peerExchange): use lru for storing peers --- waku/v2/discv5/discover.go | 3 + waku/v2/protocol/peer_exchange/enr_cache.go | 71 ++++++++++ waku/v2/protocol/peer_exchange/protocol.go | 149 +++----------------- 3 files changed, 96 insertions(+), 127 deletions(-) create mode 100644 waku/v2/protocol/peer_exchange/enr_cache.go diff --git a/waku/v2/discv5/discover.go b/waku/v2/discv5/discover.go index 01c98767..f21267eb 100644 --- a/waku/v2/discv5/discover.go +++ b/waku/v2/discv5/discover.go @@ -255,6 +255,9 @@ func evaluateNode(node *enode.Node) bool { return true } +// get random nodes from DHT via discv5 listender +// used for caching enr address in peerExchange +// used for connecting to peers in discovery_connector func (d *DiscoveryV5) Iterator() (enode.Iterator, error) { if d.listener == nil { return nil, ErrNoDiscV5Listener diff --git a/waku/v2/protocol/peer_exchange/enr_cache.go b/waku/v2/protocol/peer_exchange/enr_cache.go new file mode 100644 index 00000000..382a9d5b --- /dev/null +++ b/waku/v2/protocol/peer_exchange/enr_cache.go @@ -0,0 +1,71 @@ +package peer_exchange + +import ( + "bufio" + "bytes" + "math/rand" + "sync" + + "github.com/ethereum/go-ethereum/p2p/enode" + lru "github.com/hashicorp/golang-lru" + "github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/pb" +) + +// there is Arccache which is also thread safe but it is too verbose for the use-case and adds unnecessary overhead +type enrCache struct { + // using lru, saves us from periodically cleaning the cache to maintain a certain size + data *lru.Cache + rng *rand.Rand + mu sync.RWMutex +} + +// err on negative size +func newEnrCache(size int) (*enrCache, error) { + inner, err := lru.New(size) + return &enrCache{ + data: inner, + rng: rand.New(rand.NewSource(rand.Int63())), + }, err +} + +// updating cache +func (c *enrCache) updateCache(node *enode.Node) { + c.mu.Lock() + defer c.mu.Unlock() + c.data.Add(node.ID(), node) +} + +// get `numPeers` records of enr +func (c *enrCache) getENRs(neededPeers int) ([]*pb.PeerInfo, error) { + c.mu.RLock() + defer c.mu.RUnlock() + // + availablePeers := c.data.Len() + if availablePeers == 0 { + return nil, nil + } + if availablePeers < neededPeers { + neededPeers = availablePeers + } + + perm := c.rng.Perm(availablePeers)[0:neededPeers] + keys := c.data.Keys() + result := []*pb.PeerInfo{} + for _, ind := range perm { + node, ok := c.data.Get(keys[ind]) + if !ok { + continue + } + var b bytes.Buffer + writer := bufio.NewWriter(&b) + err := node.(*enode.Node).Record().EncodeRLP(writer) + if err != nil { + return nil, err + } + writer.Flush() + result = append(result, &pb.PeerInfo{ + ENR: b.Bytes(), + }) + } + return result, nil +} diff --git a/waku/v2/protocol/peer_exchange/protocol.go b/waku/v2/protocol/peer_exchange/protocol.go index ae48a0cd..cfeaf706 100644 --- a/waku/v2/protocol/peer_exchange/protocol.go +++ b/waku/v2/protocol/peer_exchange/protocol.go @@ -1,17 +1,13 @@ package peer_exchange import ( - "bufio" - "bytes" "context" "errors" "fmt" "math" - "math/rand" "sync" "time" - "github.com/ethereum/go-ethereum/p2p/enode" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" @@ -29,18 +25,12 @@ import ( // PeerExchangeID_v20alpha1 is the current Waku Peer Exchange protocol identifier const PeerExchangeID_v20alpha1 = libp2pProtocol.ID("/vac/waku/peer-exchange/2.0.0-alpha1") const MaxCacheSize = 1000 -const CacheCleanWindow = 200 var ( ErrNoPeersAvailable = errors.New("no suitable remote peers") ErrInvalidId = errors.New("invalid request id") ) -type peerRecord struct { - node *enode.Node - idx int -} - type WakuPeerExchange struct { h host.Host disc *discv5.DiscoveryV5 @@ -51,10 +41,8 @@ type WakuPeerExchange struct { wg sync.WaitGroup peerConnector PeerConnector - peerCh chan peer.AddrInfo - enrCache map[enode.ID]peerRecord // todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/ + enrCache *enrCache enrCacheMutex sync.RWMutex - rng *rand.Rand } type PeerConnector interface { @@ -63,11 +51,14 @@ type PeerConnector interface { // NewWakuPeerExchange returns a new instance of WakuPeerExchange struct func NewWakuPeerExchange(disc *discv5.DiscoveryV5, peerConnector PeerConnector, log *zap.Logger) (*WakuPeerExchange, error) { + newEnrCache, err := newEnrCache(MaxCacheSize) + if err != nil { + return nil, err + } wakuPX := new(WakuPeerExchange) wakuPX.disc = disc wakuPX.log = log.Named("wakupx") - wakuPX.enrCache = make(map[enode.ID]peerRecord) - wakuPX.rng = rand.New(rand.NewSource(rand.Int63())) + wakuPX.enrCache = newEnrCache wakuPX.peerConnector = peerConnector return wakuPX, nil } @@ -87,7 +78,6 @@ func (wakuPX *WakuPeerExchange) Start(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) wakuPX.cancel = cancel - wakuPX.peerCh = make(chan peer.AddrInfo) wakuPX.h.SetStreamHandlerMatch(PeerExchangeID_v20alpha1, protocol.PrefixTextMatch(string(PeerExchangeID_v20alpha1)), wakuPX.onRequest(ctx)) wakuPX.log.Info("Peer exchange protocol started") @@ -113,7 +103,7 @@ func (wakuPX *WakuPeerExchange) onRequest(ctx context.Context) func(s network.St if requestRPC.Query != nil { logger.Info("request received") - records, err := wakuPX.getENRsFromCache(requestRPC.Query.NumPeers) + records, err := wakuPX.enrCache.getENRs(int(requestRPC.Query.NumPeers)) if err != nil { logger.Error("obtaining enrs from cache", zap.Error(err)) metrics.RecordPeerExchangeError(ctx, "pxFailure") @@ -142,101 +132,18 @@ func (wakuPX *WakuPeerExchange) Stop() { } wakuPX.h.RemoveStreamHandler(PeerExchangeID_v20alpha1) wakuPX.cancel() - close(wakuPX.peerCh) wakuPX.wg.Wait() } -func (wakuPX *WakuPeerExchange) getENRsFromCache(numPeers uint64) ([]*pb.PeerInfo, error) { - wakuPX.enrCacheMutex.Lock() - defer wakuPX.enrCacheMutex.Unlock() - - if len(wakuPX.enrCache) == 0 { - return nil, nil - } - - numItems := int(numPeers) - if len(wakuPX.enrCache) < int(numPeers) { - numItems = len(wakuPX.enrCache) - } - - perm := wakuPX.rng.Perm(len(wakuPX.enrCache))[0:numItems] - permSet := make(map[int]int) - for i, v := range perm { - permSet[v] = i - } - - var result []*pb.PeerInfo - iter := 0 - for k := range wakuPX.enrCache { - if _, ok := permSet[iter]; ok { - var b bytes.Buffer - writer := bufio.NewWriter(&b) - enode := wakuPX.enrCache[k] - - err := enode.node.Record().EncodeRLP(writer) - if err != nil { - return nil, err - } - - writer.Flush() - - result = append(result, &pb.PeerInfo{ - ENR: b.Bytes(), - }) - } - iter++ - } - - return result, nil -} - -func (wakuPX *WakuPeerExchange) cleanCache() { - if len(wakuPX.enrCache) < MaxCacheSize { - return - } - - r := make(map[enode.ID]peerRecord) - for k, v := range wakuPX.enrCache { - if v.idx > CacheCleanWindow { - v.idx -= CacheCleanWindow - r[k] = v - } - } - - wakuPX.enrCache = r -} - func (wakuPX *WakuPeerExchange) iterate(ctx context.Context) error { iterator, err := wakuPX.disc.Iterator() if err != nil { return fmt.Errorf("obtaining iterator: %w", err) } + // Closing iterator + defer iterator.Close() - closeCh := make(chan struct{}, 1) - defer close(closeCh) - - // Closing iterator when context is cancelled or function is returning - wakuPX.wg.Add(1) - go func() { - defer wakuPX.wg.Done() - select { - case <-ctx.Done(): - iterator.Close() - case <-closeCh: - iterator.Close() - } - }() - - for { - if ctx.Err() != nil { - break - } - - exists := iterator.Next() - if !exists { - break - } - + for iterator.Next() { _, addresses, err := enr.Multiaddress(iterator.Node()) if err != nil { wakuPX.log.Error("extracting multiaddrs from enr", zap.Error(err)) @@ -248,15 +155,14 @@ func (wakuPX *WakuPeerExchange) iterate(ctx context.Context) error { } wakuPX.log.Debug("Discovered px peers via discv5") + wakuPX.enrCache.updateCache(iterator.Node()) - wakuPX.enrCacheMutex.Lock() - wakuPX.enrCache[iterator.Node().ID()] = peerRecord{ - idx: len(wakuPX.enrCache), - node: iterator.Node(), + select { + case <-ctx.Done(): + return nil + default: } - wakuPX.enrCacheMutex.Unlock() } - return nil } @@ -269,27 +175,16 @@ func (wakuPX *WakuPeerExchange) runPeerExchangeDiscv5Loop(ctx context.Context) { return } - ch := make(chan struct{}, 1) - ch <- struct{}{} // Initial execution - - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - -restartLoop: for { + err := wakuPX.iterate(ctx) + if err != nil { + wakuPX.log.Debug("iterating peer exchange", zap.Error(err)) + time.Sleep(2 * time.Second) + } select { - case <-ch: - err := wakuPX.iterate(ctx) - if err != nil { - wakuPX.log.Debug("iterating peer exchange", zap.Error(err)) - time.Sleep(2 * time.Second) - } - ch <- struct{}{} - case <-ticker.C: - wakuPX.cleanCache() case <-ctx.Done(): - close(ch) - break restartLoop + return + default: } } }