refactor(peerExchange): use lru for storing peers

This commit is contained in:
harsh-98 2023-05-04 16:56:27 +05:30 committed by RichΛrd
parent c15f4ada56
commit c890b1fee8
3 changed files with 96 additions and 127 deletions

View File

@ -255,6 +255,9 @@ func evaluateNode(node *enode.Node) bool {
return true
}
// get random nodes from DHT via discv5 listender
// used for caching enr address in peerExchange
// used for connecting to peers in discovery_connector
func (d *DiscoveryV5) Iterator() (enode.Iterator, error) {
if d.listener == nil {
return nil, ErrNoDiscV5Listener

View File

@ -0,0 +1,71 @@
package peer_exchange
import (
"bufio"
"bytes"
"math/rand"
"sync"
"github.com/ethereum/go-ethereum/p2p/enode"
lru "github.com/hashicorp/golang-lru"
"github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/pb"
)
// there is Arccache which is also thread safe but it is too verbose for the use-case and adds unnecessary overhead
type enrCache struct {
// using lru, saves us from periodically cleaning the cache to maintain a certain size
data *lru.Cache
rng *rand.Rand
mu sync.RWMutex
}
// err on negative size
func newEnrCache(size int) (*enrCache, error) {
inner, err := lru.New(size)
return &enrCache{
data: inner,
rng: rand.New(rand.NewSource(rand.Int63())),
}, err
}
// updating cache
func (c *enrCache) updateCache(node *enode.Node) {
c.mu.Lock()
defer c.mu.Unlock()
c.data.Add(node.ID(), node)
}
// get `numPeers` records of enr
func (c *enrCache) getENRs(neededPeers int) ([]*pb.PeerInfo, error) {
c.mu.RLock()
defer c.mu.RUnlock()
//
availablePeers := c.data.Len()
if availablePeers == 0 {
return nil, nil
}
if availablePeers < neededPeers {
neededPeers = availablePeers
}
perm := c.rng.Perm(availablePeers)[0:neededPeers]
keys := c.data.Keys()
result := []*pb.PeerInfo{}
for _, ind := range perm {
node, ok := c.data.Get(keys[ind])
if !ok {
continue
}
var b bytes.Buffer
writer := bufio.NewWriter(&b)
err := node.(*enode.Node).Record().EncodeRLP(writer)
if err != nil {
return nil, err
}
writer.Flush()
result = append(result, &pb.PeerInfo{
ENR: b.Bytes(),
})
}
return result, nil
}

View File

@ -1,17 +1,13 @@
package peer_exchange
import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"math"
"math/rand"
"sync"
"time"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
@ -29,18 +25,12 @@ import (
// PeerExchangeID_v20alpha1 is the current Waku Peer Exchange protocol identifier
const PeerExchangeID_v20alpha1 = libp2pProtocol.ID("/vac/waku/peer-exchange/2.0.0-alpha1")
const MaxCacheSize = 1000
const CacheCleanWindow = 200
var (
ErrNoPeersAvailable = errors.New("no suitable remote peers")
ErrInvalidId = errors.New("invalid request id")
)
type peerRecord struct {
node *enode.Node
idx int
}
type WakuPeerExchange struct {
h host.Host
disc *discv5.DiscoveryV5
@ -51,10 +41,8 @@ type WakuPeerExchange struct {
wg sync.WaitGroup
peerConnector PeerConnector
peerCh chan peer.AddrInfo
enrCache map[enode.ID]peerRecord // todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/
enrCache *enrCache
enrCacheMutex sync.RWMutex
rng *rand.Rand
}
type PeerConnector interface {
@ -63,11 +51,14 @@ type PeerConnector interface {
// NewWakuPeerExchange returns a new instance of WakuPeerExchange struct
func NewWakuPeerExchange(disc *discv5.DiscoveryV5, peerConnector PeerConnector, log *zap.Logger) (*WakuPeerExchange, error) {
newEnrCache, err := newEnrCache(MaxCacheSize)
if err != nil {
return nil, err
}
wakuPX := new(WakuPeerExchange)
wakuPX.disc = disc
wakuPX.log = log.Named("wakupx")
wakuPX.enrCache = make(map[enode.ID]peerRecord)
wakuPX.rng = rand.New(rand.NewSource(rand.Int63()))
wakuPX.enrCache = newEnrCache
wakuPX.peerConnector = peerConnector
return wakuPX, nil
}
@ -87,7 +78,6 @@ func (wakuPX *WakuPeerExchange) Start(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
wakuPX.cancel = cancel
wakuPX.peerCh = make(chan peer.AddrInfo)
wakuPX.h.SetStreamHandlerMatch(PeerExchangeID_v20alpha1, protocol.PrefixTextMatch(string(PeerExchangeID_v20alpha1)), wakuPX.onRequest(ctx))
wakuPX.log.Info("Peer exchange protocol started")
@ -113,7 +103,7 @@ func (wakuPX *WakuPeerExchange) onRequest(ctx context.Context) func(s network.St
if requestRPC.Query != nil {
logger.Info("request received")
records, err := wakuPX.getENRsFromCache(requestRPC.Query.NumPeers)
records, err := wakuPX.enrCache.getENRs(int(requestRPC.Query.NumPeers))
if err != nil {
logger.Error("obtaining enrs from cache", zap.Error(err))
metrics.RecordPeerExchangeError(ctx, "pxFailure")
@ -142,101 +132,18 @@ func (wakuPX *WakuPeerExchange) Stop() {
}
wakuPX.h.RemoveStreamHandler(PeerExchangeID_v20alpha1)
wakuPX.cancel()
close(wakuPX.peerCh)
wakuPX.wg.Wait()
}
func (wakuPX *WakuPeerExchange) getENRsFromCache(numPeers uint64) ([]*pb.PeerInfo, error) {
wakuPX.enrCacheMutex.Lock()
defer wakuPX.enrCacheMutex.Unlock()
if len(wakuPX.enrCache) == 0 {
return nil, nil
}
numItems := int(numPeers)
if len(wakuPX.enrCache) < int(numPeers) {
numItems = len(wakuPX.enrCache)
}
perm := wakuPX.rng.Perm(len(wakuPX.enrCache))[0:numItems]
permSet := make(map[int]int)
for i, v := range perm {
permSet[v] = i
}
var result []*pb.PeerInfo
iter := 0
for k := range wakuPX.enrCache {
if _, ok := permSet[iter]; ok {
var b bytes.Buffer
writer := bufio.NewWriter(&b)
enode := wakuPX.enrCache[k]
err := enode.node.Record().EncodeRLP(writer)
if err != nil {
return nil, err
}
writer.Flush()
result = append(result, &pb.PeerInfo{
ENR: b.Bytes(),
})
}
iter++
}
return result, nil
}
func (wakuPX *WakuPeerExchange) cleanCache() {
if len(wakuPX.enrCache) < MaxCacheSize {
return
}
r := make(map[enode.ID]peerRecord)
for k, v := range wakuPX.enrCache {
if v.idx > CacheCleanWindow {
v.idx -= CacheCleanWindow
r[k] = v
}
}
wakuPX.enrCache = r
}
func (wakuPX *WakuPeerExchange) iterate(ctx context.Context) error {
iterator, err := wakuPX.disc.Iterator()
if err != nil {
return fmt.Errorf("obtaining iterator: %w", err)
}
// Closing iterator
defer iterator.Close()
closeCh := make(chan struct{}, 1)
defer close(closeCh)
// Closing iterator when context is cancelled or function is returning
wakuPX.wg.Add(1)
go func() {
defer wakuPX.wg.Done()
select {
case <-ctx.Done():
iterator.Close()
case <-closeCh:
iterator.Close()
}
}()
for {
if ctx.Err() != nil {
break
}
exists := iterator.Next()
if !exists {
break
}
for iterator.Next() {
_, addresses, err := enr.Multiaddress(iterator.Node())
if err != nil {
wakuPX.log.Error("extracting multiaddrs from enr", zap.Error(err))
@ -248,15 +155,14 @@ func (wakuPX *WakuPeerExchange) iterate(ctx context.Context) error {
}
wakuPX.log.Debug("Discovered px peers via discv5")
wakuPX.enrCache.updateCache(iterator.Node())
wakuPX.enrCacheMutex.Lock()
wakuPX.enrCache[iterator.Node().ID()] = peerRecord{
idx: len(wakuPX.enrCache),
node: iterator.Node(),
select {
case <-ctx.Done():
return nil
default:
}
wakuPX.enrCacheMutex.Unlock()
}
return nil
}
@ -269,27 +175,16 @@ func (wakuPX *WakuPeerExchange) runPeerExchangeDiscv5Loop(ctx context.Context) {
return
}
ch := make(chan struct{}, 1)
ch <- struct{}{} // Initial execution
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
restartLoop:
for {
select {
case <-ch:
err := wakuPX.iterate(ctx)
if err != nil {
wakuPX.log.Debug("iterating peer exchange", zap.Error(err))
time.Sleep(2 * time.Second)
}
ch <- struct{}{}
case <-ticker.C:
wakuPX.cleanCache()
select {
case <-ctx.Done():
close(ch)
break restartLoop
return
default:
}
}
}