Feat/peer manager (#596)

* chore: refactor existing code into peer maanger package

* feat: move peer connection related code into peer manager

* feat: in relay peer connection pruning

* feat: add max-connections CLI flag and limit outRelayPeers based on max-connections #621

* tested both in and out relay connection limits

Co-authored-by: richΛrd <info@richardramos.me>

* Review comment, use context to pause connectivity loop during node shutdown.

Co-authored-by: richΛrd <info@richardramos.me>

* address review comments


---------

Co-authored-by: richΛrd <info@richardramos.me>
This commit is contained in:
Prem Chaitanya Prathi 2023-08-03 21:51:15 +05:30 committed by GitHub
parent 33344c2ae0
commit 34de2941c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 294 additions and 93 deletions

View File

@ -25,6 +25,13 @@ var (
Destination: &options.Address,
EnvVars: []string{"WAKUNODE2_ADDRESS"},
})
MaxPeerConnections = altsrc.NewIntFlag(&cli.IntFlag{
Name: "max-connections",
Value: 50,
Usage: "Maximum allowed number of libp2p connections.",
Destination: &options.MaxPeerConnections,
EnvVars: []string{"WAKUNODE2_MAX_CONNECTIONS"},
})
WebsocketSupport = altsrc.NewBoolFlag(&cli.BoolFlag{
Name: "websocket-support",
Aliases: []string{"ws"},

View File

@ -19,6 +19,7 @@ func main() {
&cli.StringFlag{Name: "config-file", Usage: "loads configuration from a TOML file (cmd-line parameters take precedence)"},
TcpPort,
Address,
MaxPeerConnections,
WebsocketSupport,
WebsocketPort,
WebsocketSecurePort,

View File

@ -20,7 +20,7 @@ import (
"github.com/waku-org/go-waku/waku/persistence/sqlite"
dbutils "github.com/waku-org/go-waku/waku/persistence/utils"
wmetrics "github.com/waku-org/go-waku/waku/v2/metrics"
"github.com/waku-org/go-waku/waku/v2/peers"
wakupeerstore "github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/rendezvous"
"github.com/ethereum/go-ethereum/accounts/keystore"
@ -129,6 +129,7 @@ func Execute(options Options) {
node.WithPrivateKey(prvKey),
node.WithHostAddress(hostAddr),
node.WithKeepAlive(options.KeepAlive),
node.WithMaxPeerConnections(options.MaxPeerConnections),
}
if len(options.AdvertiseAddresses) != 0 {
nodeOpts = append(nodeOpts, node.WithAdvertiseAddresses(options.AdvertiseAddresses...))
@ -403,7 +404,7 @@ func Execute(options Options) {
if options.PeerExchange.Enable && options.PeerExchange.Node != nil {
logger.Info("retrieving peer info via peer exchange protocol")
peerID, err := wakuNode.AddPeer(*options.PeerExchange.Node, peers.Static, peer_exchange.PeerExchangeID_v20alpha1)
peerID, err := wakuNode.AddPeer(*options.PeerExchange.Node, wakupeerstore.Static, peer_exchange.PeerExchangeID_v20alpha1)
if err != nil {
logger.Error("adding peer exchange peer", logging.MultiAddrs("node", *options.PeerExchange.Node), zap.Error(err))
} else {
@ -434,7 +435,7 @@ func Execute(options Options) {
var peerIDs []peer.ID
for _, n := range options.Store.ResumeNodes {
pID, err := wakuNode.AddPeer(n, peers.Static, store.StoreID_v20beta4)
pID, err := wakuNode.AddPeer(n, wakupeerstore.Static, store.StoreID_v20beta4)
if err != nil {
logger.Warn("adding peer to peerstore", logging.MultiAddrs("peer", n), zap.Error(err))
}
@ -502,7 +503,7 @@ func Execute(options Options) {
func addStaticPeers(wakuNode *node.WakuNode, addresses []multiaddr.Multiaddr, protocols ...protocol.ID) {
for _, addr := range addresses {
_, err := wakuNode.AddPeer(addr, peers.Static, protocols...)
_, err := wakuNode.AddPeer(addr, wakupeerstore.Static, protocols...)
failOnErr(err, "error adding peer")
}
}

View File

@ -170,6 +170,7 @@ type Options struct {
PersistPeers bool
UserAgent string
PProf bool
MaxPeerConnections int
PeerExchange PeerExchangeOptions
Websocket WSOptions

View File

@ -12,7 +12,7 @@ import (
"github.com/go-chi/chi/v5"
"github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/peers"
"github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
@ -197,7 +197,7 @@ func (d *StoreService) getV1Messages(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel()
_, err = d.node.AddPeer(peerAddr, peers.Static)
_, err = d.node.AddPeer(peerAddr, peerstore.Static)
if err != nil {
writeStoreError(w, http.StatusInternalServerError, err)
return

View File

@ -11,7 +11,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/peers"
"github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb"
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
@ -83,7 +83,7 @@ func TestFilterSubscription(t *testing.T) {
break
}
_, err = d.node.AddPeer(addr, peers.Static, legacy_filter.FilterID_v20beta1)
_, err = d.node.AddPeer(addr, peerstore.Static, legacy_filter.FilterID_v20beta1)
require.NoError(t, err)
args := &FilterContentArgs{Topic: testTopic, ContentFilters: []*pb.FilterRequest_ContentFilter{{ContentTopic: "ct"}}}

View File

@ -107,10 +107,10 @@ require (
github.com/tklauser/numcpus v0.2.2 // indirect
github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98 // indirect
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20230628220917-7b4e5ae4c0e7 // indirect
github.com/waku-org/go-zerokit-rln v0.1.12 // indirect
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230331231302-258cacb91327 // indirect
github.com/waku-org/go-zerokit-rln-arm v0.0.0-20230331223149-f90e66aebb0d // indirect
github.com/waku-org/go-zerokit-rln-x86_64 v0.0.0-20230331181847-cba74520bae9 // indirect
github.com/waku-org/go-zerokit-rln v0.1.13-0.20230726180145-0496a42e60fb // indirect
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230726162122-13b66414cd5b // indirect
github.com/waku-org/go-zerokit-rln-arm v0.0.0-20230726162204-c48a56712ef0 // indirect
github.com/waku-org/go-zerokit-rln-x86_64 v0.0.0-20230726162310-d761ca9911d8 // indirect
github.com/wk8/go-ordered-map v1.0.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.uber.org/atomic v1.11.0 // indirect

View File

@ -658,14 +658,14 @@ github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98 h1:xwY0kW5XZF
github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20230628220917-7b4e5ae4c0e7 h1:0e1h+p84yBp0IN7AqgbZlV7lgFBjm214lgSOE7CeJmE=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20230628220917-7b4e5ae4c0e7/go.mod h1:pFvOZ9YTFsW0o5zJW7a0B5tr1owAijRWJctXJ2toL04=
github.com/waku-org/go-zerokit-rln v0.1.12 h1:66+tU6sTlmUpuUlEv7kCFOGZ37MwZYFJBXHcm8QquwU=
github.com/waku-org/go-zerokit-rln v0.1.12/go.mod h1:MUW+wB6Yj7UBMdZrhko7oHfUZeY2wchggXYjpUiMoac=
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230331231302-258cacb91327 h1:Q5XQqo+PEmvrybT8D7BEsKCwIYDi80s+00Q49cfm9Gs=
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230331231302-258cacb91327/go.mod h1:KYykqtdApHVYZ3G0spwMnoxc5jH5eI3jyO9SwsSfi48=
github.com/waku-org/go-zerokit-rln-arm v0.0.0-20230331223149-f90e66aebb0d h1:Kcg85Y2xGU6hqZ/kMfkLQF2jAog8vt+tw1/VNidzNtE=
github.com/waku-org/go-zerokit-rln-arm v0.0.0-20230331223149-f90e66aebb0d/go.mod h1:7cSGUoGVIla1IpnChrLbkVjkYgdOcr7rcifEfh4ReR4=
github.com/waku-org/go-zerokit-rln-x86_64 v0.0.0-20230331181847-cba74520bae9 h1:u+YUlWDltHiK5upSb7M6mStc84zdc4vTCNNOz7R5RaY=
github.com/waku-org/go-zerokit-rln-x86_64 v0.0.0-20230331181847-cba74520bae9/go.mod h1:+LeEYoW5/uBUTVjtBGLEVCUe9mOYAlu5ZPkIxLOSr5Y=
github.com/waku-org/go-zerokit-rln v0.1.13-0.20230726180145-0496a42e60fb h1:pxPRTh2DWCPCC5dhFisHuBCm1k54fMtR8VR6hUWD734=
github.com/waku-org/go-zerokit-rln v0.1.13-0.20230726180145-0496a42e60fb/go.mod h1:QYTnrByLh6OXvMzSvPNs5aykT/w4fQb4krGcZfKgSZw=
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230726162122-13b66414cd5b h1:wWs8b91SVrxYy37gdNnFDCbjv1hMUHMTwaJUktyjrJE=
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230726162122-13b66414cd5b/go.mod h1:KYykqtdApHVYZ3G0spwMnoxc5jH5eI3jyO9SwsSfi48=
github.com/waku-org/go-zerokit-rln-arm v0.0.0-20230726162204-c48a56712ef0 h1:JU5aMzRFeyG/DOiMwLy3F1AMuuXjzPrUKZpW72kAHxE=
github.com/waku-org/go-zerokit-rln-arm v0.0.0-20230726162204-c48a56712ef0/go.mod h1:7cSGUoGVIla1IpnChrLbkVjkYgdOcr7rcifEfh4ReR4=
github.com/waku-org/go-zerokit-rln-x86_64 v0.0.0-20230726162310-d761ca9911d8 h1:pQmTryFdSQuUe8dxt/dHgEfRdLwqf1DEGeReuMcJ9Yg=
github.com/waku-org/go-zerokit-rln-x86_64 v0.0.0-20230726162310-d761ca9911d8/go.mod h1:+LeEYoW5/uBUTVjtBGLEVCUe9mOYAlu5ZPkIxLOSr5Y=
github.com/willf/bitset v1.1.3/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
github.com/wk8/go-ordered-map v1.0.0 h1:BV7z+2PaK8LTSd/mWgY12HyMAo5CEgkHqbkVq2thqr8=
github.com/wk8/go-ordered-map v1.0.0/go.mod h1:9ZIbRunKbuvfPKyBP1SIKLcXNlv74YCOZ3t3VTS6gRk=

View File

@ -29,7 +29,7 @@ import (
dbutils "github.com/waku-org/go-waku/waku/persistence/utils"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/payload"
"github.com/waku-org/go-waku/waku/v2/peers"
"github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
@ -256,7 +256,7 @@ func AddPeer(address string, protocolID string) string {
return MakeJSONResponse(err)
}
peerID, err := wakuState.node.AddPeer(ma, peers.Static, libp2pProtocol.ID(protocolID))
peerID, err := wakuState.node.AddPeer(ma, peerstore.Static, libp2pProtocol.ID(protocolID))
return PrepareJSONResponse(peerID, err)
}

View File

@ -17,8 +17,8 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
"github.com/multiformats/go-multiaddr"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/peers"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
)
@ -107,7 +107,7 @@ func MakeHost(ctx context.Context, port int, randomness io.Reader) (host.Host, e
return nil, err
}
psWrapper := peers.NewWakuPeerstore(ps)
psWrapper := peerstore.NewWakuPeerstore(ps)
if err != nil {
return nil, err
}
@ -138,19 +138,19 @@ func RandomHex(n int) (string, error) {
type TestPeerDiscoverer struct {
sync.RWMutex
peerMap map[peer.ID]struct{}
peerCh chan v2.PeerData
peerCh chan peermanager.PeerData
}
func NewTestPeerDiscoverer() *TestPeerDiscoverer {
result := &TestPeerDiscoverer{
peerMap: make(map[peer.ID]struct{}),
peerCh: make(chan v2.PeerData, 10),
peerCh: make(chan peermanager.PeerData, 10),
}
return result
}
func (t *TestPeerDiscoverer) Subscribe(ctx context.Context, ch <-chan v2.PeerData) {
func (t *TestPeerDiscoverer) Subscribe(ctx context.Context, ch <-chan peermanager.PeerData) {
go func() {
for p := range ch {
t.Lock()

View File

@ -15,11 +15,10 @@ import (
"github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-discover/discover"
"github.com/waku-org/go-waku/logging"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/metrics"
"github.com/waku-org/go-waku/waku/v2/peers"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/peerstore"
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
@ -31,7 +30,7 @@ var ErrNoDiscV5Listener = errors.New("no discv5 listener")
// PeerConnector will subscribe to a channel containing the information for all peers found by this discovery protocol
type PeerConnector interface {
Subscribe(context.Context, <-chan v2.PeerData)
Subscribe(context.Context, <-chan peermanager.PeerData)
}
type DiscoveryV5 struct {
@ -43,7 +42,7 @@ type DiscoveryV5 struct {
localnode *enode.LocalNode
peerConnector PeerConnector
peerCh chan v2.PeerData
peerCh chan peermanager.PeerData
NAT nat.Interface
log *zap.Logger
@ -203,7 +202,7 @@ func (d *DiscoveryV5) Start(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
d.cancel = cancel
d.peerCh = make(chan v2.PeerData)
d.peerCh = make(chan peermanager.PeerData)
d.peerConnector.Subscribe(ctx, d.peerCh)
err := d.listen(ctx)
@ -423,8 +422,8 @@ func (d *DiscoveryV5) peerLoop(ctx context.Context) error {
defer iterator.Close()
d.Iterate(ctx, iterator, func(n *enode.Node, p peer.AddrInfo) error {
peer := v2.PeerData{
Origin: peers.Discv5,
peer := peermanager.PeerData{
Origin: peerstore.Discv5,
AddrInfo: p,
ENR: n,
}

View File

@ -16,6 +16,8 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"go.opencensus.io/stats"
"go.uber.org/zap"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
)
// PeerStatis is a map of peer IDs to supported protocols
@ -66,7 +68,7 @@ func (c ConnectionNotifier) ListenClose(n network.Network, m multiaddr.Multiaddr
// Connected is called when a connection is opened
func (c ConnectionNotifier) Connected(n network.Network, cc network.Conn) {
c.log.Info("peer connected", logging.HostID("peer", cc.RemotePeer()))
c.log.Info("peer connected", logging.HostID("peer", cc.RemotePeer()), zap.String("direction", cc.Stat().Direction.String()))
if c.connNotifCh != nil {
select {
case c.connNotifCh <- PeerConnection{cc.RemotePeer(), true}:
@ -74,6 +76,11 @@ func (c ConnectionNotifier) Connected(n network.Network, cc network.Conn) {
c.log.Warn("subscriber is too slow")
}
}
//TODO: Move this to be stored in Waku's own peerStore
err := c.h.Peerstore().(wps.WakuPeerstore).SetDirection(cc.RemotePeer(), cc.Stat().Direction)
if err != nil {
c.log.Error("Failed to set peer direction for an outgoing connection", zap.Error(err))
}
stats.Record(c.ctx, metrics.Peers.M(1))
}

View File

@ -30,10 +30,10 @@ import (
"go.opencensus.io/stats"
"github.com/waku-org/go-waku/logging"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/discv5"
"github.com/waku-org/go-waku/waku/v2/metrics"
"github.com/waku-org/go-waku/waku/v2/peers"
"github.com/waku-org/go-waku/waku/v2/peermanager"
peerstore1 "github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol/enr"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter"
@ -82,7 +82,7 @@ type WakuNode struct {
timesource timesource.Timesource
peerstore peerstore.Peerstore
peerConnector *v2.PeerConnectionStrategy
peerConnector *peermanager.PeerConnectionStrategy
relay Service
lightPush Service
@ -119,6 +119,8 @@ type WakuNode struct {
connStatusChan chan<- ConnStatus
storeFactory storeFactory
peermanager *peermanager.PeerManager
}
func defaultStoreFactory(w *WakuNode) store.Store {
@ -193,14 +195,14 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
// Setup peerstore wrapper
if params.peerstore != nil {
w.peerstore = peers.NewWakuPeerstore(params.peerstore)
w.peerstore = peerstore1.NewWakuPeerstore(params.peerstore)
params.libP2POpts = append(params.libP2POpts, libp2p.Peerstore(w.peerstore))
} else {
ps, err := pstoremem.NewPeerstore()
if err != nil {
return nil, err
}
w.peerstore = peers.NewWakuPeerstore(ps)
w.peerstore = peerstore1.NewWakuPeerstore(ps)
params.libP2POpts = append(params.libP2POpts, libp2p.Peerstore(w.peerstore))
}
@ -241,13 +243,17 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
if err != nil {
w.log.Error("creating localnode", zap.Error(err))
}
//Initialize peer manager.
w.peermanager = peermanager.NewPeerManager(uint(w.opts.maxPeerConnections), w.log)
maxOutPeers := int(w.peermanager.OutRelayPeersTarget)
// Setup peer connection strategy
cacheSize := 600
rngSrc := rand.NewSource(rand.Int63())
minBackoff, maxBackoff := time.Minute, time.Hour
bkf := backoff.NewExponentialBackoff(minBackoff, maxBackoff, backoff.FullJitter, time.Second, 5.0, 0, rand.New(rngSrc))
w.peerConnector, err = v2.NewPeerConnectionStrategy(cacheSize, w.opts.discoveryMinPeers, discoveryConnectTimeout, bkf, w.log)
w.peerConnector, err = peermanager.NewPeerConnectionStrategy(cacheSize, maxOutPeers, discoveryConnectTimeout, bkf, w.log)
if err != nil {
w.log.Error("creating peer connection strategy", zap.Error(err))
}
@ -322,7 +328,7 @@ func (w *WakuNode) watchMultiaddressChanges(ctx context.Context) {
// Start initializes all the protocols that were setup in the WakuNode
func (w *WakuNode) Start(ctx context.Context) error {
connGater := v2.NewConnectionGater(w.log)
connGater := peermanager.NewConnectionGater(w.log)
ctx, cancel := context.WithCancel(ctx)
w.cancel = cancel
@ -394,6 +400,8 @@ func (w *WakuNode) Start(ctx context.Context) error {
if err != nil {
return err
}
w.peermanager.SetHost(host)
w.peermanager.Start(ctx)
}
w.store = w.storeFactory(w)
@ -668,9 +676,9 @@ func (w *WakuNode) startStore(ctx context.Context, sub relay.Subscription) error
return nil
}
func (w *WakuNode) addPeer(info *peer.AddrInfo, origin peers.Origin, protocols ...protocol.ID) error {
func (w *WakuNode) addPeer(info *peer.AddrInfo, origin peerstore1.Origin, protocols ...protocol.ID) error {
w.log.Info("adding peer to peerstore", logging.HostID("peer", info.ID))
err := w.host.Peerstore().(peers.WakuPeerstore).SetOrigin(info.ID, origin)
err := w.host.Peerstore().(peerstore1.WakuPeerstore).SetOrigin(info.ID, origin)
if err != nil {
return err
}
@ -685,7 +693,7 @@ func (w *WakuNode) addPeer(info *peer.AddrInfo, origin peers.Origin, protocols .
}
// AddPeer is used to add a peer and the protocols it support to the node peerstore
func (w *WakuNode) AddPeer(address ma.Multiaddr, origin peers.Origin, protocols ...protocol.ID) (peer.ID, error) {
func (w *WakuNode) AddPeer(address ma.Multiaddr, origin peerstore1.Origin, protocols ...protocol.ID) (peer.ID, error) {
info, err := peer.AddrInfoFromP2pAddr(address)
if err != nil {
return "", err
@ -727,11 +735,11 @@ func (w *WakuNode) DialPeerWithInfo(ctx context.Context, peerInfo peer.AddrInfo)
func (w *WakuNode) connect(ctx context.Context, info peer.AddrInfo) error {
err := w.host.Connect(ctx, info)
if err != nil {
w.host.Peerstore().(peers.WakuPeerstore).AddConnFailure(info)
w.host.Peerstore().(peerstore1.WakuPeerstore).AddConnFailure(info)
return err
}
w.host.Peerstore().(peers.WakuPeerstore).ResetConnFailures(info)
w.host.Peerstore().(peerstore1.WakuPeerstore).ResetConnFailures(info)
stats.Record(ctx, metrics.Dials.M(1))
return nil
}
@ -842,7 +850,6 @@ func (w *WakuNode) findRelayNodes(ctx context.Context) {
for _, p := range peers {
info := w.Host().Peerstore().PeerInfo(p.ID)
supportedProtocols, err := w.Host().Peerstore().SupportsProtocols(p.ID, proto.ProtoIDv2Hop)
if err != nil {
w.log.Error("could not check supported protocols", zap.Error(err))

View File

@ -83,7 +83,7 @@ type WakuNodeParameters struct {
enableRendezvousPoint bool
rendezvousDB *rendezvous.DB
discoveryMinPeers int
maxPeerConnections int
enableDiscV5 bool
udpPort uint
@ -120,7 +120,7 @@ type WakuNodeOption func(*WakuNodeParameters) error
// Default options used in the libp2p node
var DefaultWakuNodeOptions = []WakuNodeOption{
WithDiscoverParams(150),
WithMaxPeerConnections(50),
}
// MultiAddresses return the list of multiaddresses configured in the node
@ -332,9 +332,9 @@ func WithWakuRelayAndMinPeers(minRelayPeersToPublish int, opts ...pubsub.Option)
}
}
func WithDiscoverParams(minPeers int) WakuNodeOption {
func WithMaxPeerConnections(maxPeers int) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.discoveryMinPeers = minPeers
params.maxPeerConnections = maxPeers
return nil
}
}

View File

@ -1,4 +1,4 @@
package v2
package peermanager
import (
"runtime"

View File

@ -1,4 +1,4 @@
package v2
package peermanager
// Adapted from github.com/libp2p/go-libp2p@v0.23.2/p2p/discovery/backoff/backoffconnector.go
@ -16,13 +16,20 @@ import (
"github.com/libp2p/go-libp2p/p2p/discovery/backoff"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/peers"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
"go.uber.org/zap"
lru "github.com/hashicorp/golang-lru"
)
// PeerData contains information about a peer useful in establishing connections with it.
type PeerData struct {
Origin wps.Origin
AddrInfo peer.AddrInfo
ENR *enode.Node
}
// PeerConnectionStrategy is a utility to connect to peers, but only if we have not recently tried connecting to them already
type PeerConnectionStrategy struct {
sync.RWMutex
@ -36,7 +43,7 @@ type PeerConnectionStrategy struct {
workerCancel context.CancelFunc
wg sync.WaitGroup
minPeers int
maxOutPeers int
dialTimeout time.Duration
peerCh chan PeerData
dialCh chan peer.AddrInfo
@ -52,7 +59,7 @@ type PeerConnectionStrategy struct {
// dialTimeout is how long we attempt to connect to a peer before giving up
// minPeers is the minimum number of peers that the node should have
// backoff describes the strategy used to decide how long to backoff after previously attempting to connect to a peer
func NewPeerConnectionStrategy(cacheSize int, minPeers int, dialTimeout time.Duration, backoff backoff.BackoffFactory, logger *zap.Logger) (*PeerConnectionStrategy, error) {
func NewPeerConnectionStrategy(cacheSize int, maxOutPeers int, dialTimeout time.Duration, backoff backoff.BackoffFactory, logger *zap.Logger) (*PeerConnectionStrategy, error) {
cache, err := lru.New2Q(cacheSize)
if err != nil {
return nil, err
@ -61,7 +68,7 @@ func NewPeerConnectionStrategy(cacheSize int, minPeers int, dialTimeout time.Dur
return &PeerConnectionStrategy{
cache: cache,
wg: sync.WaitGroup{},
minPeers: minPeers,
maxOutPeers: maxOutPeers,
dialTimeout: dialTimeout,
backoff: backoff,
logger: logger.Named("discovery-connector"),
@ -73,12 +80,6 @@ type connCacheData struct {
strat backoff.BackoffStrategy
}
type PeerData struct {
Origin peers.Origin
AddrInfo peer.AddrInfo
ENR *enode.Node
}
// Subscribe receives channels on which discovered peers should be pushed
func (c *PeerConnectionStrategy) Subscribe(ctx context.Context, ch <-chan PeerData) {
if c.cancel != nil {
@ -171,13 +172,18 @@ func (c *PeerConnectionStrategy) shouldDialPeers(ctx context.Context) {
return
case <-ticker.C:
isPaused := c.isPaused()
numPeers := len(c.host.Network().Peers())
if numPeers >= c.minPeers && !isPaused {
_, outRelayPeers, err := c.host.Peerstore().(wps.WakuPeerstore).GroupPeersByDirection()
if err != nil {
c.logger.Info("Failed to get outRelayPeers from peerstore", zap.Error(err))
continue
}
numPeers := outRelayPeers.Len()
if numPeers >= c.maxOutPeers && !isPaused {
c.Lock()
c.paused = true
c.workerCancel()
c.Unlock()
} else if numPeers < c.minPeers && isPaused {
} else if numPeers < c.maxOutPeers && isPaused {
c.Lock()
c.paused = false
c.workerCtx, c.workerCancel = context.WithCancel(ctx)
@ -220,13 +226,13 @@ func (c *PeerConnectionStrategy) workPublisher(ctx context.Context) {
return
case p := <-c.peerCh:
c.host.Peerstore().AddAddrs(p.AddrInfo.ID, p.AddrInfo.Addrs, peerstore.AddressTTL)
err := c.host.Peerstore().(peers.WakuPeerstore).SetOrigin(p.AddrInfo.ID, p.Origin)
err := c.host.Peerstore().(wps.WakuPeerstore).SetOrigin(p.AddrInfo.ID, p.Origin)
if err != nil {
c.logger.Error("could not set origin", zap.Error(err), logging.HostID("peer", p.AddrInfo.ID))
}
if p.ENR != nil {
err = c.host.Peerstore().(peers.WakuPeerstore).SetENR(p.AddrInfo.ID, p.ENR)
err = c.host.Peerstore().(wps.WakuPeerstore).SetENR(p.AddrInfo.ID, p.ENR)
if err != nil {
c.logger.Error("could not store enr", zap.Error(err), logging.HostID("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String()))
}
@ -250,7 +256,7 @@ const maxActiveDials = 5
func (c *PeerConnectionStrategy) dialPeers(ctx context.Context) {
defer c.wg.Done()
maxGoRoutines := c.minPeers
maxGoRoutines := c.maxOutPeers
if maxGoRoutines > maxActiveDials {
maxGoRoutines = maxActiveDials
}
@ -299,7 +305,7 @@ func (c *PeerConnectionStrategy) dialPeers(ctx context.Context) {
defer cancel()
err := c.host.Connect(ctx, pi)
if err != nil && !errors.Is(err, context.Canceled) {
c.host.Peerstore().(peers.WakuPeerstore).AddConnFailure(pi)
c.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(pi)
c.logger.Info("connecting to peer", logging.HostID("peerID", pi.ID), zap.Error(err))
}
<-sem

View File

@ -0,0 +1,122 @@
package peermanager
import (
"context"
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
"go.uber.org/zap"
)
// TODO: Move all the protocol IDs to a common location.
// WakuRelayIDv200 is protocol ID for Waku v2 relay protocol
const WakuRelayIDv200 = protocol.ID("/vac/waku/relay/2.0.0")
// PeerManager applies various controls and manage connections towards peers.
type PeerManager struct {
maxRelayPeers uint
logger *zap.Logger
InRelayPeersTarget uint
OutRelayPeersTarget uint
host host.Host
}
const maxRelayPeersShare = 5
// const defaultMaxOutRelayPeersTarget = 10
const outRelayPeersShare = 3
const peerConnectivityLoopSecs = 15
// NewPeerManager creates a new peerManager instance.
func NewPeerManager(maxConnections uint, logger *zap.Logger) *PeerManager {
maxRelayPeersValue := maxConnections - (maxConnections / maxRelayPeersShare)
outRelayPeersTargetValue := uint(maxRelayPeersValue / outRelayPeersShare)
pm := &PeerManager{
logger: logger.Named("peer-manager"),
maxRelayPeers: maxRelayPeersValue,
InRelayPeersTarget: maxRelayPeersValue - outRelayPeersTargetValue,
OutRelayPeersTarget: outRelayPeersTargetValue,
}
logger.Info("PeerManager init values", zap.Uint("maxConnections", maxConnections),
zap.Uint("maxRelayPeersValue", maxRelayPeersValue), zap.Uint("outRelayPeersTargetValue", outRelayPeersTargetValue),
zap.Uint("inRelayPeersTarget", pm.InRelayPeersTarget))
return pm
}
func (pm *PeerManager) SetHost(host host.Host) {
pm.host = host
}
// Start starts the processing to be done by peer manager.
func (pm *PeerManager) Start(ctx context.Context) {
go pm.connectivityLoop(ctx)
}
// This is a connectivity loop, which currently checks and prunes inbound connections.
func (pm *PeerManager) connectivityLoop(ctx context.Context) {
t := time.NewTicker(peerConnectivityLoopSecs * time.Second)
for {
select {
case <-ctx.Done():
return
case <-t.C:
pm.pruneInRelayConns()
}
}
}
func (pm *PeerManager) filterPeersByProto(peers peer.IDSlice, proto ...protocol.ID) peer.IDSlice {
var filteredPeers peer.IDSlice
//TODO: This can be optimized once we have waku's own peerStore
for _, p := range peers {
supportedProtocols, err := pm.host.Peerstore().SupportsProtocols(p, proto...)
if err != nil {
pm.logger.Warn("Failed to get supported protocols for peer", zap.String("peerID", p.String()))
continue
}
if len(supportedProtocols) != 0 {
filteredPeers = append(filteredPeers, p)
}
}
return filteredPeers
}
func (pm *PeerManager) pruneInRelayConns() {
var inRelayPeers peer.IDSlice
//Group peers by their connected direction inbound or outbound.
inPeers, outPeers, err := pm.host.Peerstore().(wps.WakuPeerstore).GroupPeersByDirection()
if err != nil {
return
}
pm.logger.Info("Number of peers connected", zap.Int("inPeers", inPeers.Len()), zap.Int("outPeers", outPeers.Len()))
//Need to filter peers to check if they support relay
inRelayPeers = pm.filterPeersByProto(inPeers, WakuRelayIDv200)
outRelayPeers := pm.filterPeersByProto(outPeers, WakuRelayIDv200)
pm.logger.Info("Number of Relay peers connected", zap.Int("inRelayPeers", inRelayPeers.Len()), zap.Int("outRelayPeers", outRelayPeers.Len()))
if inRelayPeers.Len() > int(pm.InRelayPeersTarget) {
//Start disconnecting peers, based on what?
//For now, just disconnect most recently connected peers
//TODO: Need to have more intelligent way of doing this, maybe peer scores.
pm.logger.Info("Number of in peer connections exceed targer relay peers, hence pruning", zap.Int("inRelayPeers", inRelayPeers.Len()), zap.Uint("inRelayPeersTarget", pm.InRelayPeersTarget))
for pruningStartIndex := pm.InRelayPeersTarget; pruningStartIndex < uint(inRelayPeers.Len()); pruningStartIndex++ {
p := inRelayPeers[pruningStartIndex]
err := pm.host.Network().ClosePeer(p)
if err != nil {
pm.logger.Warn("Failed to disconnect connection towards peer", zap.String("peerID", p.String()))
}
pm.host.Peerstore().RemovePeer(p) //TODO: Should we remove the peer immediately?
pm.logger.Info("Successfully disconnected connection towards peer", zap.String("peerID", p.String()))
}
}
}

View File

@ -1,4 +1,4 @@
package peers
package peerstore
import (
"context"

View File

@ -1,13 +1,16 @@
package peers
package peerstore
import (
"sync"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
)
// Origin is used to determine how the peer is identified,
// either it is statically added or discovered via one of the discovery protocols
type Origin int64
const (
@ -21,17 +24,21 @@ const (
const peerOrigin = "origin"
const peerENR = "enr"
const peerDirection = "direction"
// ConnectionFailures contains connection failure information towards all peers
type ConnectionFailures struct {
sync.RWMutex
failures map[peer.ID]int
}
// WakuPeerstoreImpl is a implementation of WakuPeerStore
type WakuPeerstoreImpl struct {
peerStore peerstore.Peerstore
connFailures ConnectionFailures
}
// WakuPeerstore is an interface for implementing WakuPeerStore
type WakuPeerstore interface {
SetOrigin(p peer.ID, origin Origin) error
Origin(p peer.ID, origin Origin) (Origin, error)
@ -41,8 +48,13 @@ type WakuPeerstore interface {
AddConnFailure(p peer.AddrInfo)
ResetConnFailures(p peer.AddrInfo)
ConnFailures(p peer.AddrInfo) int
SetDirection(p peer.ID, direction network.Direction) error
Direction(p peer.ID) (network.Direction, error)
GroupPeersByDirection() (inPeers peer.IDSlice, outPeers peer.IDSlice, err error)
}
// NewWakuPeerstore creates a new WakuPeerStore object
func NewWakuPeerstore(p peerstore.Peerstore) peerstore.Peerstore {
return &WakuPeerstoreImpl{
peerStore: p,
@ -52,10 +64,12 @@ func NewWakuPeerstore(p peerstore.Peerstore) peerstore.Peerstore {
}
}
// SetOrigin sets origin for a specific peer.
func (ps *WakuPeerstoreImpl) SetOrigin(p peer.ID, origin Origin) error {
return ps.peerStore.Put(p, peerOrigin, origin)
}
// Origin fetches the origin for a specific peer.
func (ps *WakuPeerstoreImpl) Origin(p peer.ID, origin Origin) (Origin, error) {
result, err := ps.peerStore.Get(p, peerOrigin)
if err != nil {
@ -65,6 +79,7 @@ func (ps *WakuPeerstoreImpl) Origin(p peer.ID, origin Origin) (Origin, error) {
return result.(Origin), nil
}
// PeersByOrigin returns the list of peers for a specific origin
func (ps *WakuPeerstoreImpl) PeersByOrigin(origin Origin) peer.IDSlice {
var result peer.IDSlice
for _, p := range ps.Peers() {
@ -76,10 +91,12 @@ func (ps *WakuPeerstoreImpl) PeersByOrigin(origin Origin) peer.IDSlice {
return result
}
// SetENR sets the ENR record a peer
func (ps *WakuPeerstoreImpl) SetENR(p peer.ID, enr *enode.Node) error {
return ps.peerStore.Put(p, peerENR, enr)
}
// ENR fetches the ENR record for a peer
func (ps *WakuPeerstoreImpl) ENR(p peer.ID, origin Origin) (*enode.Node, error) {
result, err := ps.peerStore.Get(p, peerENR)
if err != nil {
@ -88,20 +105,54 @@ func (ps *WakuPeerstoreImpl) ENR(p peer.ID, origin Origin) (*enode.Node, error)
return result.(*enode.Node), nil
}
// AddConnFailure increments connectionFailures for a peer
func (ps *WakuPeerstoreImpl) AddConnFailure(p peer.AddrInfo) {
ps.connFailures.Lock()
defer ps.connFailures.Unlock()
ps.connFailures.failures[p.ID]++
}
// ResetConnFailures resets connectionFailures for a peer to 0
func (ps *WakuPeerstoreImpl) ResetConnFailures(p peer.AddrInfo) {
ps.connFailures.Lock()
defer ps.connFailures.Unlock()
ps.connFailures.failures[p.ID] = 0
}
// ConnFailures fetches connectionFailures for a peer
func (ps *WakuPeerstoreImpl) ConnFailures(p peer.AddrInfo) int {
ps.connFailures.RLock()
defer ps.connFailures.RUnlock()
return ps.connFailures.failures[p.ID]
}
// SetDirection sets connection direction for a specific peer.
func (ps *WakuPeerstoreImpl) SetDirection(p peer.ID, direction network.Direction) error {
return ps.peerStore.Put(p, peerDirection, direction)
}
// Direction fetches the connection direction (Inbound or outBound) for a specific peer
func (ps *WakuPeerstoreImpl) Direction(p peer.ID) (network.Direction, error) {
result, err := ps.peerStore.Get(p, peerDirection)
if err != nil {
return network.DirUnknown, err
}
return result.(network.Direction), nil
}
// GroupPeersByDirection returns all the peers in peer store grouped by Inbound or outBound direction
func (ps *WakuPeerstoreImpl) GroupPeersByDirection() (inPeers peer.IDSlice, outPeers peer.IDSlice, err error) {
for _, p := range ps.Peers() {
direction, err := ps.Direction(p)
if err == nil {
if direction == network.DirInbound {
inPeers = append(inPeers, p)
} else if direction == network.DirOutbound {
outPeers = append(outPeers, p)
}
}
}
return inPeers, outPeers, nil
}

View File

@ -36,7 +36,7 @@ type WakuLightPush struct {
log *zap.Logger
}
// NewWakuRelay returns a new instance of Waku Lightpush struct
// NewWakuLightPush returns a new instance of Waku Lightpush struct
func NewWakuLightPush(relay *relay.WakuRelay, log *zap.Logger) *WakuLightPush {
wakuLP := new(WakuLightPush)
wakuLP.relay = relay

View File

@ -10,9 +10,9 @@ import (
"github.com/ethereum/go-ethereum/rlp"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-msgio/pbio"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/metrics"
"github.com/waku-org/go-waku/waku/v2/peers"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/peerstore"
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
"github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/pb"
"go.uber.org/zap"
@ -104,12 +104,12 @@ func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb
go func() {
defer wakuPX.wg.Done()
peerCh := make(chan v2.PeerData)
peerCh := make(chan peermanager.PeerData)
defer close(peerCh)
wakuPX.peerConnector.Subscribe(ctx, peerCh)
for _, p := range discoveredPeers {
peer := v2.PeerData{
Origin: peers.PeerExchange,
peer := peermanager.PeerData{
Origin: peerstore.PeerExchange,
AddrInfo: p.addrInfo,
ENR: p.enr,
}

View File

@ -13,9 +13,9 @@ import (
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-msgio/pbio"
"github.com/waku-org/go-waku/logging"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/discv5"
"github.com/waku-org/go-waku/waku/v2/metrics"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/enr"
"github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/pb"
@ -33,7 +33,7 @@ var (
// PeerConnector will subscribe to a channel containing the information for all peers found by this discovery protocol
type PeerConnector interface {
Subscribe(context.Context, <-chan v2.PeerData)
Subscribe(context.Context, <-chan peermanager.PeerData)
}
type WakuPeerExchange struct {

View File

@ -10,8 +10,8 @@ import (
"github.com/libp2p/go-libp2p/core/host"
rvs "github.com/waku-org/go-libp2p-rendezvous"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/peers"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol"
"go.uber.org/zap"
)
@ -39,7 +39,7 @@ type Rendezvous struct {
// PeerConnector will subscribe to a channel containing the information for all peers found by this discovery protocol
type PeerConnector interface {
Subscribe(context.Context, <-chan v2.PeerData)
Subscribe(context.Context, <-chan peermanager.PeerData)
}
// NewRendezvous creates an instance of Rendezvous struct
@ -105,12 +105,12 @@ func (r *Rendezvous) DiscoverWithNamespace(ctx context.Context, namespace string
if len(addrInfo) != 0 {
rp.SetSuccess(cookie)
peerCh := make(chan v2.PeerData)
peerCh := make(chan peermanager.PeerData)
defer close(peerCh)
r.peerConnector.Subscribe(ctx, peerCh)
for _, p := range addrInfo {
peer := v2.PeerData{
Origin: peers.Rendezvous,
peer := peermanager.PeerData{
Origin: peerstore.Rendezvous,
AddrInfo: p,
}
select {

View File

@ -14,17 +14,16 @@ import (
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/persistence/sqlite"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/utils"
)
type PeerConn struct {
ch <-chan v2.PeerData
ch <-chan peermanager.PeerData
}
func (p *PeerConn) Subscribe(ctx context.Context, ch <-chan v2.PeerData) {
func (p *PeerConn) Subscribe(ctx context.Context, ch <-chan peermanager.PeerData) {
p.ch = ch
}
func NewPeerConn() *PeerConn {