From 34de2941c71cf8281b32fbf55630ed647436594a Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Thu, 3 Aug 2023 21:51:15 +0530 Subject: [PATCH] Feat/peer manager (#596) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * chore: refactor existing code into peer maanger package * feat: move peer connection related code into peer manager * feat: in relay peer connection pruning * feat: add max-connections CLI flag and limit outRelayPeers based on max-connections #621 * tested both in and out relay connection limits Co-authored-by: richΛrd * Review comment, use context to pause connectivity loop during node shutdown. Co-authored-by: richΛrd * address review comments --------- Co-authored-by: richΛrd --- cmd/waku/flags.go | 7 + cmd/waku/main.go | 1 + cmd/waku/node.go | 9 +- cmd/waku/options.go | 1 + cmd/waku/rest/store.go | 4 +- cmd/waku/rpc/filter_test.go | 4 +- examples/basic2/go.mod | 8 +- examples/basic2/go.sum | 16 +-- mobile/api.go | 4 +- tests/utils.go | 12 +- waku/v2/discv5/discover.go | 15 +-- waku/v2/node/connectedness.go | 9 +- waku/v2/node/wakunode2.go | 33 +++-- waku/v2/node/wakuoptions.go | 8 +- waku/v2/{ => peermanager}/connection_gater.go | 2 +- .../{ => peermanager}/discovery_connector.go | 42 +++--- waku/v2/peermanager/peer_manager.go | 122 ++++++++++++++++++ waku/v2/{peers => peerstore}/inherited.go | 2 +- .../waku_peer_store.go} | 53 +++++++- waku/v2/protocol/lightpush/waku_lightpush.go | 2 +- waku/v2/protocol/peer_exchange/client.go | 10 +- waku/v2/protocol/peer_exchange/protocol.go | 4 +- waku/v2/rendezvous/rendezvous.go | 12 +- waku/v2/rendezvous/rendezvous_test.go | 7 +- 24 files changed, 294 insertions(+), 93 deletions(-) rename waku/v2/{ => peermanager}/connection_gater.go (99%) rename waku/v2/{ => peermanager}/discovery_connector.go (86%) create mode 100644 waku/v2/peermanager/peer_manager.go rename waku/v2/{peers => peerstore}/inherited.go (99%) rename waku/v2/{peers/peerstore.go => peerstore/waku_peer_store.go} (53%) diff --git a/cmd/waku/flags.go b/cmd/waku/flags.go index 3c94cd47..dffff8e5 100644 --- a/cmd/waku/flags.go +++ b/cmd/waku/flags.go @@ -25,6 +25,13 @@ var ( Destination: &options.Address, EnvVars: []string{"WAKUNODE2_ADDRESS"}, }) + MaxPeerConnections = altsrc.NewIntFlag(&cli.IntFlag{ + Name: "max-connections", + Value: 50, + Usage: "Maximum allowed number of libp2p connections.", + Destination: &options.MaxPeerConnections, + EnvVars: []string{"WAKUNODE2_MAX_CONNECTIONS"}, + }) WebsocketSupport = altsrc.NewBoolFlag(&cli.BoolFlag{ Name: "websocket-support", Aliases: []string{"ws"}, diff --git a/cmd/waku/main.go b/cmd/waku/main.go index 8df1cab6..a3de1d74 100644 --- a/cmd/waku/main.go +++ b/cmd/waku/main.go @@ -19,6 +19,7 @@ func main() { &cli.StringFlag{Name: "config-file", Usage: "loads configuration from a TOML file (cmd-line parameters take precedence)"}, TcpPort, Address, + MaxPeerConnections, WebsocketSupport, WebsocketPort, WebsocketSecurePort, diff --git a/cmd/waku/node.go b/cmd/waku/node.go index 7be2616b..77f0a457 100644 --- a/cmd/waku/node.go +++ b/cmd/waku/node.go @@ -20,7 +20,7 @@ import ( "github.com/waku-org/go-waku/waku/persistence/sqlite" dbutils "github.com/waku-org/go-waku/waku/persistence/utils" wmetrics "github.com/waku-org/go-waku/waku/v2/metrics" - "github.com/waku-org/go-waku/waku/v2/peers" + wakupeerstore "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/rendezvous" "github.com/ethereum/go-ethereum/accounts/keystore" @@ -129,6 +129,7 @@ func Execute(options Options) { node.WithPrivateKey(prvKey), node.WithHostAddress(hostAddr), node.WithKeepAlive(options.KeepAlive), + node.WithMaxPeerConnections(options.MaxPeerConnections), } if len(options.AdvertiseAddresses) != 0 { nodeOpts = append(nodeOpts, node.WithAdvertiseAddresses(options.AdvertiseAddresses...)) @@ -403,7 +404,7 @@ func Execute(options Options) { if options.PeerExchange.Enable && options.PeerExchange.Node != nil { logger.Info("retrieving peer info via peer exchange protocol") - peerID, err := wakuNode.AddPeer(*options.PeerExchange.Node, peers.Static, peer_exchange.PeerExchangeID_v20alpha1) + peerID, err := wakuNode.AddPeer(*options.PeerExchange.Node, wakupeerstore.Static, peer_exchange.PeerExchangeID_v20alpha1) if err != nil { logger.Error("adding peer exchange peer", logging.MultiAddrs("node", *options.PeerExchange.Node), zap.Error(err)) } else { @@ -434,7 +435,7 @@ func Execute(options Options) { var peerIDs []peer.ID for _, n := range options.Store.ResumeNodes { - pID, err := wakuNode.AddPeer(n, peers.Static, store.StoreID_v20beta4) + pID, err := wakuNode.AddPeer(n, wakupeerstore.Static, store.StoreID_v20beta4) if err != nil { logger.Warn("adding peer to peerstore", logging.MultiAddrs("peer", n), zap.Error(err)) } @@ -502,7 +503,7 @@ func Execute(options Options) { func addStaticPeers(wakuNode *node.WakuNode, addresses []multiaddr.Multiaddr, protocols ...protocol.ID) { for _, addr := range addresses { - _, err := wakuNode.AddPeer(addr, peers.Static, protocols...) + _, err := wakuNode.AddPeer(addr, wakupeerstore.Static, protocols...) failOnErr(err, "error adding peer") } } diff --git a/cmd/waku/options.go b/cmd/waku/options.go index d9281a96..82845489 100644 --- a/cmd/waku/options.go +++ b/cmd/waku/options.go @@ -170,6 +170,7 @@ type Options struct { PersistPeers bool UserAgent string PProf bool + MaxPeerConnections int PeerExchange PeerExchangeOptions Websocket WSOptions diff --git a/cmd/waku/rest/store.go b/cmd/waku/rest/store.go index 00656995..bb628f6e 100644 --- a/cmd/waku/rest/store.go +++ b/cmd/waku/rest/store.go @@ -12,7 +12,7 @@ import ( "github.com/go-chi/chi/v5" "github.com/multiformats/go-multiaddr" "github.com/waku-org/go-waku/waku/v2/node" - "github.com/waku-org/go-waku/waku/v2/peers" + "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" "github.com/waku-org/go-waku/waku/v2/utils" @@ -197,7 +197,7 @@ func (d *StoreService) getV1Messages(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) defer cancel() - _, err = d.node.AddPeer(peerAddr, peers.Static) + _, err = d.node.AddPeer(peerAddr, peerstore.Static) if err != nil { writeStoreError(w, http.StatusInternalServerError, err) return diff --git a/cmd/waku/rpc/filter_test.go b/cmd/waku/rpc/filter_test.go index f5ff0ea4..0ea6d4eb 100644 --- a/cmd/waku/rpc/filter_test.go +++ b/cmd/waku/rpc/filter_test.go @@ -11,7 +11,7 @@ import ( "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/v2/node" - "github.com/waku-org/go-waku/waku/v2/peers" + "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb" wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" @@ -83,7 +83,7 @@ func TestFilterSubscription(t *testing.T) { break } - _, err = d.node.AddPeer(addr, peers.Static, legacy_filter.FilterID_v20beta1) + _, err = d.node.AddPeer(addr, peerstore.Static, legacy_filter.FilterID_v20beta1) require.NoError(t, err) args := &FilterContentArgs{Topic: testTopic, ContentFilters: []*pb.FilterRequest_ContentFilter{{ContentTopic: "ct"}}} diff --git a/examples/basic2/go.mod b/examples/basic2/go.mod index bdfe10f2..6a496f67 100644 --- a/examples/basic2/go.mod +++ b/examples/basic2/go.mod @@ -107,10 +107,10 @@ require ( github.com/tklauser/numcpus v0.2.2 // indirect github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98 // indirect github.com/waku-org/go-libp2p-rendezvous v0.0.0-20230628220917-7b4e5ae4c0e7 // indirect - github.com/waku-org/go-zerokit-rln v0.1.12 // indirect - github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230331231302-258cacb91327 // indirect - github.com/waku-org/go-zerokit-rln-arm v0.0.0-20230331223149-f90e66aebb0d // indirect - github.com/waku-org/go-zerokit-rln-x86_64 v0.0.0-20230331181847-cba74520bae9 // indirect + github.com/waku-org/go-zerokit-rln v0.1.13-0.20230726180145-0496a42e60fb // indirect + github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230726162122-13b66414cd5b // indirect + github.com/waku-org/go-zerokit-rln-arm v0.0.0-20230726162204-c48a56712ef0 // indirect + github.com/waku-org/go-zerokit-rln-x86_64 v0.0.0-20230726162310-d761ca9911d8 // indirect github.com/wk8/go-ordered-map v1.0.0 // indirect go.opencensus.io v0.24.0 // indirect go.uber.org/atomic v1.11.0 // indirect diff --git a/examples/basic2/go.sum b/examples/basic2/go.sum index a7a55bfe..95f2f425 100644 --- a/examples/basic2/go.sum +++ b/examples/basic2/go.sum @@ -658,14 +658,14 @@ github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98 h1:xwY0kW5XZF github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20230628220917-7b4e5ae4c0e7 h1:0e1h+p84yBp0IN7AqgbZlV7lgFBjm214lgSOE7CeJmE= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20230628220917-7b4e5ae4c0e7/go.mod h1:pFvOZ9YTFsW0o5zJW7a0B5tr1owAijRWJctXJ2toL04= -github.com/waku-org/go-zerokit-rln v0.1.12 h1:66+tU6sTlmUpuUlEv7kCFOGZ37MwZYFJBXHcm8QquwU= -github.com/waku-org/go-zerokit-rln v0.1.12/go.mod h1:MUW+wB6Yj7UBMdZrhko7oHfUZeY2wchggXYjpUiMoac= -github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230331231302-258cacb91327 h1:Q5XQqo+PEmvrybT8D7BEsKCwIYDi80s+00Q49cfm9Gs= -github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230331231302-258cacb91327/go.mod h1:KYykqtdApHVYZ3G0spwMnoxc5jH5eI3jyO9SwsSfi48= -github.com/waku-org/go-zerokit-rln-arm v0.0.0-20230331223149-f90e66aebb0d h1:Kcg85Y2xGU6hqZ/kMfkLQF2jAog8vt+tw1/VNidzNtE= -github.com/waku-org/go-zerokit-rln-arm v0.0.0-20230331223149-f90e66aebb0d/go.mod h1:7cSGUoGVIla1IpnChrLbkVjkYgdOcr7rcifEfh4ReR4= -github.com/waku-org/go-zerokit-rln-x86_64 v0.0.0-20230331181847-cba74520bae9 h1:u+YUlWDltHiK5upSb7M6mStc84zdc4vTCNNOz7R5RaY= -github.com/waku-org/go-zerokit-rln-x86_64 v0.0.0-20230331181847-cba74520bae9/go.mod h1:+LeEYoW5/uBUTVjtBGLEVCUe9mOYAlu5ZPkIxLOSr5Y= +github.com/waku-org/go-zerokit-rln v0.1.13-0.20230726180145-0496a42e60fb h1:pxPRTh2DWCPCC5dhFisHuBCm1k54fMtR8VR6hUWD734= +github.com/waku-org/go-zerokit-rln v0.1.13-0.20230726180145-0496a42e60fb/go.mod h1:QYTnrByLh6OXvMzSvPNs5aykT/w4fQb4krGcZfKgSZw= +github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230726162122-13b66414cd5b h1:wWs8b91SVrxYy37gdNnFDCbjv1hMUHMTwaJUktyjrJE= +github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230726162122-13b66414cd5b/go.mod h1:KYykqtdApHVYZ3G0spwMnoxc5jH5eI3jyO9SwsSfi48= +github.com/waku-org/go-zerokit-rln-arm v0.0.0-20230726162204-c48a56712ef0 h1:JU5aMzRFeyG/DOiMwLy3F1AMuuXjzPrUKZpW72kAHxE= +github.com/waku-org/go-zerokit-rln-arm v0.0.0-20230726162204-c48a56712ef0/go.mod h1:7cSGUoGVIla1IpnChrLbkVjkYgdOcr7rcifEfh4ReR4= +github.com/waku-org/go-zerokit-rln-x86_64 v0.0.0-20230726162310-d761ca9911d8 h1:pQmTryFdSQuUe8dxt/dHgEfRdLwqf1DEGeReuMcJ9Yg= +github.com/waku-org/go-zerokit-rln-x86_64 v0.0.0-20230726162310-d761ca9911d8/go.mod h1:+LeEYoW5/uBUTVjtBGLEVCUe9mOYAlu5ZPkIxLOSr5Y= github.com/willf/bitset v1.1.3/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= github.com/wk8/go-ordered-map v1.0.0 h1:BV7z+2PaK8LTSd/mWgY12HyMAo5CEgkHqbkVq2thqr8= github.com/wk8/go-ordered-map v1.0.0/go.mod h1:9ZIbRunKbuvfPKyBP1SIKLcXNlv74YCOZ3t3VTS6gRk= diff --git a/mobile/api.go b/mobile/api.go index 2b01549b..f175918a 100644 --- a/mobile/api.go +++ b/mobile/api.go @@ -29,7 +29,7 @@ import ( dbutils "github.com/waku-org/go-waku/waku/persistence/utils" "github.com/waku-org/go-waku/waku/v2/node" "github.com/waku-org/go-waku/waku/v2/payload" - "github.com/waku-org/go-waku/waku/v2/peers" + "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/utils" @@ -256,7 +256,7 @@ func AddPeer(address string, protocolID string) string { return MakeJSONResponse(err) } - peerID, err := wakuState.node.AddPeer(ma, peers.Static, libp2pProtocol.ID(protocolID)) + peerID, err := wakuState.node.AddPeer(ma, peerstore.Static, libp2pProtocol.ID(protocolID)) return PrepareJSONResponse(peerID, err) } diff --git a/tests/utils.go b/tests/utils.go index 0f3a623c..26350b54 100644 --- a/tests/utils.go +++ b/tests/utils.go @@ -17,8 +17,8 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" "github.com/multiformats/go-multiaddr" - v2 "github.com/waku-org/go-waku/waku/v2" - "github.com/waku-org/go-waku/waku/v2/peers" + "github.com/waku-org/go-waku/waku/v2/peermanager" + "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol/pb" ) @@ -107,7 +107,7 @@ func MakeHost(ctx context.Context, port int, randomness io.Reader) (host.Host, e return nil, err } - psWrapper := peers.NewWakuPeerstore(ps) + psWrapper := peerstore.NewWakuPeerstore(ps) if err != nil { return nil, err } @@ -138,19 +138,19 @@ func RandomHex(n int) (string, error) { type TestPeerDiscoverer struct { sync.RWMutex peerMap map[peer.ID]struct{} - peerCh chan v2.PeerData + peerCh chan peermanager.PeerData } func NewTestPeerDiscoverer() *TestPeerDiscoverer { result := &TestPeerDiscoverer{ peerMap: make(map[peer.ID]struct{}), - peerCh: make(chan v2.PeerData, 10), + peerCh: make(chan peermanager.PeerData, 10), } return result } -func (t *TestPeerDiscoverer) Subscribe(ctx context.Context, ch <-chan v2.PeerData) { +func (t *TestPeerDiscoverer) Subscribe(ctx context.Context, ch <-chan peermanager.PeerData) { go func() { for p := range ch { t.Lock() diff --git a/waku/v2/discv5/discover.go b/waku/v2/discv5/discover.go index f03fa8d3..6cd056c1 100644 --- a/waku/v2/discv5/discover.go +++ b/waku/v2/discv5/discover.go @@ -15,11 +15,10 @@ import ( "github.com/multiformats/go-multiaddr" "github.com/waku-org/go-discover/discover" "github.com/waku-org/go-waku/logging" - v2 "github.com/waku-org/go-waku/waku/v2" "github.com/waku-org/go-waku/waku/v2/metrics" - "github.com/waku-org/go-waku/waku/v2/peers" + "github.com/waku-org/go-waku/waku/v2/peermanager" + "github.com/waku-org/go-waku/waku/v2/peerstore" wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" - "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" @@ -31,7 +30,7 @@ var ErrNoDiscV5Listener = errors.New("no discv5 listener") // PeerConnector will subscribe to a channel containing the information for all peers found by this discovery protocol type PeerConnector interface { - Subscribe(context.Context, <-chan v2.PeerData) + Subscribe(context.Context, <-chan peermanager.PeerData) } type DiscoveryV5 struct { @@ -43,7 +42,7 @@ type DiscoveryV5 struct { localnode *enode.LocalNode peerConnector PeerConnector - peerCh chan v2.PeerData + peerCh chan peermanager.PeerData NAT nat.Interface log *zap.Logger @@ -203,7 +202,7 @@ func (d *DiscoveryV5) Start(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) d.cancel = cancel - d.peerCh = make(chan v2.PeerData) + d.peerCh = make(chan peermanager.PeerData) d.peerConnector.Subscribe(ctx, d.peerCh) err := d.listen(ctx) @@ -423,8 +422,8 @@ func (d *DiscoveryV5) peerLoop(ctx context.Context) error { defer iterator.Close() d.Iterate(ctx, iterator, func(n *enode.Node, p peer.AddrInfo) error { - peer := v2.PeerData{ - Origin: peers.Discv5, + peer := peermanager.PeerData{ + Origin: peerstore.Discv5, AddrInfo: p, ENR: n, } diff --git a/waku/v2/node/connectedness.go b/waku/v2/node/connectedness.go index 2b866fa4..66b0313e 100644 --- a/waku/v2/node/connectedness.go +++ b/waku/v2/node/connectedness.go @@ -16,6 +16,8 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/store" "go.opencensus.io/stats" "go.uber.org/zap" + + wps "github.com/waku-org/go-waku/waku/v2/peerstore" ) // PeerStatis is a map of peer IDs to supported protocols @@ -66,7 +68,7 @@ func (c ConnectionNotifier) ListenClose(n network.Network, m multiaddr.Multiaddr // Connected is called when a connection is opened func (c ConnectionNotifier) Connected(n network.Network, cc network.Conn) { - c.log.Info("peer connected", logging.HostID("peer", cc.RemotePeer())) + c.log.Info("peer connected", logging.HostID("peer", cc.RemotePeer()), zap.String("direction", cc.Stat().Direction.String())) if c.connNotifCh != nil { select { case c.connNotifCh <- PeerConnection{cc.RemotePeer(), true}: @@ -74,6 +76,11 @@ func (c ConnectionNotifier) Connected(n network.Network, cc network.Conn) { c.log.Warn("subscriber is too slow") } } + //TODO: Move this to be stored in Waku's own peerStore + err := c.h.Peerstore().(wps.WakuPeerstore).SetDirection(cc.RemotePeer(), cc.Stat().Direction) + if err != nil { + c.log.Error("Failed to set peer direction for an outgoing connection", zap.Error(err)) + } stats.Record(c.ctx, metrics.Peers.M(1)) } diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index e415d902..0f554e1d 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -30,10 +30,10 @@ import ( "go.opencensus.io/stats" "github.com/waku-org/go-waku/logging" - v2 "github.com/waku-org/go-waku/waku/v2" "github.com/waku-org/go-waku/waku/v2/discv5" "github.com/waku-org/go-waku/waku/v2/metrics" - "github.com/waku-org/go-waku/waku/v2/peers" + "github.com/waku-org/go-waku/waku/v2/peermanager" + peerstore1 "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter" @@ -82,7 +82,7 @@ type WakuNode struct { timesource timesource.Timesource peerstore peerstore.Peerstore - peerConnector *v2.PeerConnectionStrategy + peerConnector *peermanager.PeerConnectionStrategy relay Service lightPush Service @@ -119,6 +119,8 @@ type WakuNode struct { connStatusChan chan<- ConnStatus storeFactory storeFactory + + peermanager *peermanager.PeerManager } func defaultStoreFactory(w *WakuNode) store.Store { @@ -193,14 +195,14 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { // Setup peerstore wrapper if params.peerstore != nil { - w.peerstore = peers.NewWakuPeerstore(params.peerstore) + w.peerstore = peerstore1.NewWakuPeerstore(params.peerstore) params.libP2POpts = append(params.libP2POpts, libp2p.Peerstore(w.peerstore)) } else { ps, err := pstoremem.NewPeerstore() if err != nil { return nil, err } - w.peerstore = peers.NewWakuPeerstore(ps) + w.peerstore = peerstore1.NewWakuPeerstore(ps) params.libP2POpts = append(params.libP2POpts, libp2p.Peerstore(w.peerstore)) } @@ -241,13 +243,17 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { if err != nil { w.log.Error("creating localnode", zap.Error(err)) } + //Initialize peer manager. + w.peermanager = peermanager.NewPeerManager(uint(w.opts.maxPeerConnections), w.log) + maxOutPeers := int(w.peermanager.OutRelayPeersTarget) // Setup peer connection strategy cacheSize := 600 rngSrc := rand.NewSource(rand.Int63()) minBackoff, maxBackoff := time.Minute, time.Hour bkf := backoff.NewExponentialBackoff(minBackoff, maxBackoff, backoff.FullJitter, time.Second, 5.0, 0, rand.New(rngSrc)) - w.peerConnector, err = v2.NewPeerConnectionStrategy(cacheSize, w.opts.discoveryMinPeers, discoveryConnectTimeout, bkf, w.log) + + w.peerConnector, err = peermanager.NewPeerConnectionStrategy(cacheSize, maxOutPeers, discoveryConnectTimeout, bkf, w.log) if err != nil { w.log.Error("creating peer connection strategy", zap.Error(err)) } @@ -322,7 +328,7 @@ func (w *WakuNode) watchMultiaddressChanges(ctx context.Context) { // Start initializes all the protocols that were setup in the WakuNode func (w *WakuNode) Start(ctx context.Context) error { - connGater := v2.NewConnectionGater(w.log) + connGater := peermanager.NewConnectionGater(w.log) ctx, cancel := context.WithCancel(ctx) w.cancel = cancel @@ -394,6 +400,8 @@ func (w *WakuNode) Start(ctx context.Context) error { if err != nil { return err } + w.peermanager.SetHost(host) + w.peermanager.Start(ctx) } w.store = w.storeFactory(w) @@ -668,9 +676,9 @@ func (w *WakuNode) startStore(ctx context.Context, sub relay.Subscription) error return nil } -func (w *WakuNode) addPeer(info *peer.AddrInfo, origin peers.Origin, protocols ...protocol.ID) error { +func (w *WakuNode) addPeer(info *peer.AddrInfo, origin peerstore1.Origin, protocols ...protocol.ID) error { w.log.Info("adding peer to peerstore", logging.HostID("peer", info.ID)) - err := w.host.Peerstore().(peers.WakuPeerstore).SetOrigin(info.ID, origin) + err := w.host.Peerstore().(peerstore1.WakuPeerstore).SetOrigin(info.ID, origin) if err != nil { return err } @@ -685,7 +693,7 @@ func (w *WakuNode) addPeer(info *peer.AddrInfo, origin peers.Origin, protocols . } // AddPeer is used to add a peer and the protocols it support to the node peerstore -func (w *WakuNode) AddPeer(address ma.Multiaddr, origin peers.Origin, protocols ...protocol.ID) (peer.ID, error) { +func (w *WakuNode) AddPeer(address ma.Multiaddr, origin peerstore1.Origin, protocols ...protocol.ID) (peer.ID, error) { info, err := peer.AddrInfoFromP2pAddr(address) if err != nil { return "", err @@ -727,11 +735,11 @@ func (w *WakuNode) DialPeerWithInfo(ctx context.Context, peerInfo peer.AddrInfo) func (w *WakuNode) connect(ctx context.Context, info peer.AddrInfo) error { err := w.host.Connect(ctx, info) if err != nil { - w.host.Peerstore().(peers.WakuPeerstore).AddConnFailure(info) + w.host.Peerstore().(peerstore1.WakuPeerstore).AddConnFailure(info) return err } - w.host.Peerstore().(peers.WakuPeerstore).ResetConnFailures(info) + w.host.Peerstore().(peerstore1.WakuPeerstore).ResetConnFailures(info) stats.Record(ctx, metrics.Dials.M(1)) return nil } @@ -842,7 +850,6 @@ func (w *WakuNode) findRelayNodes(ctx context.Context) { for _, p := range peers { info := w.Host().Peerstore().PeerInfo(p.ID) - supportedProtocols, err := w.Host().Peerstore().SupportsProtocols(p.ID, proto.ProtoIDv2Hop) if err != nil { w.log.Error("could not check supported protocols", zap.Error(err)) diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 99a47f2f..2c90f702 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -83,7 +83,7 @@ type WakuNodeParameters struct { enableRendezvousPoint bool rendezvousDB *rendezvous.DB - discoveryMinPeers int + maxPeerConnections int enableDiscV5 bool udpPort uint @@ -120,7 +120,7 @@ type WakuNodeOption func(*WakuNodeParameters) error // Default options used in the libp2p node var DefaultWakuNodeOptions = []WakuNodeOption{ - WithDiscoverParams(150), + WithMaxPeerConnections(50), } // MultiAddresses return the list of multiaddresses configured in the node @@ -332,9 +332,9 @@ func WithWakuRelayAndMinPeers(minRelayPeersToPublish int, opts ...pubsub.Option) } } -func WithDiscoverParams(minPeers int) WakuNodeOption { +func WithMaxPeerConnections(maxPeers int) WakuNodeOption { return func(params *WakuNodeParameters) error { - params.discoveryMinPeers = minPeers + params.maxPeerConnections = maxPeers return nil } } diff --git a/waku/v2/connection_gater.go b/waku/v2/peermanager/connection_gater.go similarity index 99% rename from waku/v2/connection_gater.go rename to waku/v2/peermanager/connection_gater.go index 766d0141..fe10e4cc 100644 --- a/waku/v2/connection_gater.go +++ b/waku/v2/peermanager/connection_gater.go @@ -1,4 +1,4 @@ -package v2 +package peermanager import ( "runtime" diff --git a/waku/v2/discovery_connector.go b/waku/v2/peermanager/discovery_connector.go similarity index 86% rename from waku/v2/discovery_connector.go rename to waku/v2/peermanager/discovery_connector.go index 03914e34..d41d8eb7 100644 --- a/waku/v2/discovery_connector.go +++ b/waku/v2/peermanager/discovery_connector.go @@ -1,4 +1,4 @@ -package v2 +package peermanager // Adapted from github.com/libp2p/go-libp2p@v0.23.2/p2p/discovery/backoff/backoffconnector.go @@ -16,13 +16,20 @@ import ( "github.com/libp2p/go-libp2p/p2p/discovery/backoff" "github.com/waku-org/go-waku/logging" - "github.com/waku-org/go-waku/waku/v2/peers" + wps "github.com/waku-org/go-waku/waku/v2/peerstore" "go.uber.org/zap" lru "github.com/hashicorp/golang-lru" ) +// PeerData contains information about a peer useful in establishing connections with it. +type PeerData struct { + Origin wps.Origin + AddrInfo peer.AddrInfo + ENR *enode.Node +} + // PeerConnectionStrategy is a utility to connect to peers, but only if we have not recently tried connecting to them already type PeerConnectionStrategy struct { sync.RWMutex @@ -36,7 +43,7 @@ type PeerConnectionStrategy struct { workerCancel context.CancelFunc wg sync.WaitGroup - minPeers int + maxOutPeers int dialTimeout time.Duration peerCh chan PeerData dialCh chan peer.AddrInfo @@ -52,7 +59,7 @@ type PeerConnectionStrategy struct { // dialTimeout is how long we attempt to connect to a peer before giving up // minPeers is the minimum number of peers that the node should have // backoff describes the strategy used to decide how long to backoff after previously attempting to connect to a peer -func NewPeerConnectionStrategy(cacheSize int, minPeers int, dialTimeout time.Duration, backoff backoff.BackoffFactory, logger *zap.Logger) (*PeerConnectionStrategy, error) { +func NewPeerConnectionStrategy(cacheSize int, maxOutPeers int, dialTimeout time.Duration, backoff backoff.BackoffFactory, logger *zap.Logger) (*PeerConnectionStrategy, error) { cache, err := lru.New2Q(cacheSize) if err != nil { return nil, err @@ -61,7 +68,7 @@ func NewPeerConnectionStrategy(cacheSize int, minPeers int, dialTimeout time.Dur return &PeerConnectionStrategy{ cache: cache, wg: sync.WaitGroup{}, - minPeers: minPeers, + maxOutPeers: maxOutPeers, dialTimeout: dialTimeout, backoff: backoff, logger: logger.Named("discovery-connector"), @@ -73,12 +80,6 @@ type connCacheData struct { strat backoff.BackoffStrategy } -type PeerData struct { - Origin peers.Origin - AddrInfo peer.AddrInfo - ENR *enode.Node -} - // Subscribe receives channels on which discovered peers should be pushed func (c *PeerConnectionStrategy) Subscribe(ctx context.Context, ch <-chan PeerData) { if c.cancel != nil { @@ -171,13 +172,18 @@ func (c *PeerConnectionStrategy) shouldDialPeers(ctx context.Context) { return case <-ticker.C: isPaused := c.isPaused() - numPeers := len(c.host.Network().Peers()) - if numPeers >= c.minPeers && !isPaused { + _, outRelayPeers, err := c.host.Peerstore().(wps.WakuPeerstore).GroupPeersByDirection() + if err != nil { + c.logger.Info("Failed to get outRelayPeers from peerstore", zap.Error(err)) + continue + } + numPeers := outRelayPeers.Len() + if numPeers >= c.maxOutPeers && !isPaused { c.Lock() c.paused = true c.workerCancel() c.Unlock() - } else if numPeers < c.minPeers && isPaused { + } else if numPeers < c.maxOutPeers && isPaused { c.Lock() c.paused = false c.workerCtx, c.workerCancel = context.WithCancel(ctx) @@ -220,13 +226,13 @@ func (c *PeerConnectionStrategy) workPublisher(ctx context.Context) { return case p := <-c.peerCh: c.host.Peerstore().AddAddrs(p.AddrInfo.ID, p.AddrInfo.Addrs, peerstore.AddressTTL) - err := c.host.Peerstore().(peers.WakuPeerstore).SetOrigin(p.AddrInfo.ID, p.Origin) + err := c.host.Peerstore().(wps.WakuPeerstore).SetOrigin(p.AddrInfo.ID, p.Origin) if err != nil { c.logger.Error("could not set origin", zap.Error(err), logging.HostID("peer", p.AddrInfo.ID)) } if p.ENR != nil { - err = c.host.Peerstore().(peers.WakuPeerstore).SetENR(p.AddrInfo.ID, p.ENR) + err = c.host.Peerstore().(wps.WakuPeerstore).SetENR(p.AddrInfo.ID, p.ENR) if err != nil { c.logger.Error("could not store enr", zap.Error(err), logging.HostID("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String())) } @@ -250,7 +256,7 @@ const maxActiveDials = 5 func (c *PeerConnectionStrategy) dialPeers(ctx context.Context) { defer c.wg.Done() - maxGoRoutines := c.minPeers + maxGoRoutines := c.maxOutPeers if maxGoRoutines > maxActiveDials { maxGoRoutines = maxActiveDials } @@ -299,7 +305,7 @@ func (c *PeerConnectionStrategy) dialPeers(ctx context.Context) { defer cancel() err := c.host.Connect(ctx, pi) if err != nil && !errors.Is(err, context.Canceled) { - c.host.Peerstore().(peers.WakuPeerstore).AddConnFailure(pi) + c.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(pi) c.logger.Info("connecting to peer", logging.HostID("peerID", pi.ID), zap.Error(err)) } <-sem diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go new file mode 100644 index 00000000..0f070a3a --- /dev/null +++ b/waku/v2/peermanager/peer_manager.go @@ -0,0 +1,122 @@ +package peermanager + +import ( + "context" + "time" + + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" + wps "github.com/waku-org/go-waku/waku/v2/peerstore" + + "go.uber.org/zap" +) + +// TODO: Move all the protocol IDs to a common location. +// WakuRelayIDv200 is protocol ID for Waku v2 relay protocol +const WakuRelayIDv200 = protocol.ID("/vac/waku/relay/2.0.0") + +// PeerManager applies various controls and manage connections towards peers. +type PeerManager struct { + maxRelayPeers uint + logger *zap.Logger + InRelayPeersTarget uint + OutRelayPeersTarget uint + host host.Host +} + +const maxRelayPeersShare = 5 + +// const defaultMaxOutRelayPeersTarget = 10 +const outRelayPeersShare = 3 +const peerConnectivityLoopSecs = 15 + +// NewPeerManager creates a new peerManager instance. +func NewPeerManager(maxConnections uint, logger *zap.Logger) *PeerManager { + + maxRelayPeersValue := maxConnections - (maxConnections / maxRelayPeersShare) + outRelayPeersTargetValue := uint(maxRelayPeersValue / outRelayPeersShare) + + pm := &PeerManager{ + logger: logger.Named("peer-manager"), + maxRelayPeers: maxRelayPeersValue, + InRelayPeersTarget: maxRelayPeersValue - outRelayPeersTargetValue, + OutRelayPeersTarget: outRelayPeersTargetValue, + } + logger.Info("PeerManager init values", zap.Uint("maxConnections", maxConnections), + zap.Uint("maxRelayPeersValue", maxRelayPeersValue), zap.Uint("outRelayPeersTargetValue", outRelayPeersTargetValue), + zap.Uint("inRelayPeersTarget", pm.InRelayPeersTarget)) + + return pm +} + +func (pm *PeerManager) SetHost(host host.Host) { + pm.host = host +} + +// Start starts the processing to be done by peer manager. +func (pm *PeerManager) Start(ctx context.Context) { + go pm.connectivityLoop(ctx) +} + +// This is a connectivity loop, which currently checks and prunes inbound connections. +func (pm *PeerManager) connectivityLoop(ctx context.Context) { + t := time.NewTicker(peerConnectivityLoopSecs * time.Second) + for { + select { + case <-ctx.Done(): + return + case <-t.C: + pm.pruneInRelayConns() + } + } +} + +func (pm *PeerManager) filterPeersByProto(peers peer.IDSlice, proto ...protocol.ID) peer.IDSlice { + var filteredPeers peer.IDSlice + //TODO: This can be optimized once we have waku's own peerStore + + for _, p := range peers { + supportedProtocols, err := pm.host.Peerstore().SupportsProtocols(p, proto...) + if err != nil { + pm.logger.Warn("Failed to get supported protocols for peer", zap.String("peerID", p.String())) + continue + } + if len(supportedProtocols) != 0 { + filteredPeers = append(filteredPeers, p) + } + } + return filteredPeers +} + +func (pm *PeerManager) pruneInRelayConns() { + + var inRelayPeers peer.IDSlice + //Group peers by their connected direction inbound or outbound. + inPeers, outPeers, err := pm.host.Peerstore().(wps.WakuPeerstore).GroupPeersByDirection() + if err != nil { + return + } + pm.logger.Info("Number of peers connected", zap.Int("inPeers", inPeers.Len()), zap.Int("outPeers", outPeers.Len())) + + //Need to filter peers to check if they support relay + inRelayPeers = pm.filterPeersByProto(inPeers, WakuRelayIDv200) + outRelayPeers := pm.filterPeersByProto(outPeers, WakuRelayIDv200) + pm.logger.Info("Number of Relay peers connected", zap.Int("inRelayPeers", inRelayPeers.Len()), zap.Int("outRelayPeers", outRelayPeers.Len())) + + if inRelayPeers.Len() > int(pm.InRelayPeersTarget) { + //Start disconnecting peers, based on what? + //For now, just disconnect most recently connected peers + //TODO: Need to have more intelligent way of doing this, maybe peer scores. + pm.logger.Info("Number of in peer connections exceed targer relay peers, hence pruning", zap.Int("inRelayPeers", inRelayPeers.Len()), zap.Uint("inRelayPeersTarget", pm.InRelayPeersTarget)) + for pruningStartIndex := pm.InRelayPeersTarget; pruningStartIndex < uint(inRelayPeers.Len()); pruningStartIndex++ { + p := inRelayPeers[pruningStartIndex] + err := pm.host.Network().ClosePeer(p) + if err != nil { + pm.logger.Warn("Failed to disconnect connection towards peer", zap.String("peerID", p.String())) + } + pm.host.Peerstore().RemovePeer(p) //TODO: Should we remove the peer immediately? + pm.logger.Info("Successfully disconnected connection towards peer", zap.String("peerID", p.String())) + } + } +} diff --git a/waku/v2/peers/inherited.go b/waku/v2/peerstore/inherited.go similarity index 99% rename from waku/v2/peers/inherited.go rename to waku/v2/peerstore/inherited.go index 316307e8..938da9b3 100644 --- a/waku/v2/peers/inherited.go +++ b/waku/v2/peerstore/inherited.go @@ -1,4 +1,4 @@ -package peers +package peerstore import ( "context" diff --git a/waku/v2/peers/peerstore.go b/waku/v2/peerstore/waku_peer_store.go similarity index 53% rename from waku/v2/peers/peerstore.go rename to waku/v2/peerstore/waku_peer_store.go index d8b5b234..e1bf4568 100644 --- a/waku/v2/peers/peerstore.go +++ b/waku/v2/peerstore/waku_peer_store.go @@ -1,13 +1,16 @@ -package peers +package peerstore import ( "sync" "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" ) +// Origin is used to determine how the peer is identified, +// either it is statically added or discovered via one of the discovery protocols type Origin int64 const ( @@ -21,17 +24,21 @@ const ( const peerOrigin = "origin" const peerENR = "enr" +const peerDirection = "direction" +// ConnectionFailures contains connection failure information towards all peers type ConnectionFailures struct { sync.RWMutex failures map[peer.ID]int } +// WakuPeerstoreImpl is a implementation of WakuPeerStore type WakuPeerstoreImpl struct { peerStore peerstore.Peerstore connFailures ConnectionFailures } +// WakuPeerstore is an interface for implementing WakuPeerStore type WakuPeerstore interface { SetOrigin(p peer.ID, origin Origin) error Origin(p peer.ID, origin Origin) (Origin, error) @@ -41,8 +48,13 @@ type WakuPeerstore interface { AddConnFailure(p peer.AddrInfo) ResetConnFailures(p peer.AddrInfo) ConnFailures(p peer.AddrInfo) int + + SetDirection(p peer.ID, direction network.Direction) error + Direction(p peer.ID) (network.Direction, error) + GroupPeersByDirection() (inPeers peer.IDSlice, outPeers peer.IDSlice, err error) } +// NewWakuPeerstore creates a new WakuPeerStore object func NewWakuPeerstore(p peerstore.Peerstore) peerstore.Peerstore { return &WakuPeerstoreImpl{ peerStore: p, @@ -52,10 +64,12 @@ func NewWakuPeerstore(p peerstore.Peerstore) peerstore.Peerstore { } } +// SetOrigin sets origin for a specific peer. func (ps *WakuPeerstoreImpl) SetOrigin(p peer.ID, origin Origin) error { return ps.peerStore.Put(p, peerOrigin, origin) } +// Origin fetches the origin for a specific peer. func (ps *WakuPeerstoreImpl) Origin(p peer.ID, origin Origin) (Origin, error) { result, err := ps.peerStore.Get(p, peerOrigin) if err != nil { @@ -65,6 +79,7 @@ func (ps *WakuPeerstoreImpl) Origin(p peer.ID, origin Origin) (Origin, error) { return result.(Origin), nil } +// PeersByOrigin returns the list of peers for a specific origin func (ps *WakuPeerstoreImpl) PeersByOrigin(origin Origin) peer.IDSlice { var result peer.IDSlice for _, p := range ps.Peers() { @@ -76,10 +91,12 @@ func (ps *WakuPeerstoreImpl) PeersByOrigin(origin Origin) peer.IDSlice { return result } +// SetENR sets the ENR record a peer func (ps *WakuPeerstoreImpl) SetENR(p peer.ID, enr *enode.Node) error { return ps.peerStore.Put(p, peerENR, enr) } +// ENR fetches the ENR record for a peer func (ps *WakuPeerstoreImpl) ENR(p peer.ID, origin Origin) (*enode.Node, error) { result, err := ps.peerStore.Get(p, peerENR) if err != nil { @@ -88,20 +105,54 @@ func (ps *WakuPeerstoreImpl) ENR(p peer.ID, origin Origin) (*enode.Node, error) return result.(*enode.Node), nil } +// AddConnFailure increments connectionFailures for a peer func (ps *WakuPeerstoreImpl) AddConnFailure(p peer.AddrInfo) { ps.connFailures.Lock() defer ps.connFailures.Unlock() ps.connFailures.failures[p.ID]++ } +// ResetConnFailures resets connectionFailures for a peer to 0 func (ps *WakuPeerstoreImpl) ResetConnFailures(p peer.AddrInfo) { ps.connFailures.Lock() defer ps.connFailures.Unlock() ps.connFailures.failures[p.ID] = 0 } +// ConnFailures fetches connectionFailures for a peer func (ps *WakuPeerstoreImpl) ConnFailures(p peer.AddrInfo) int { ps.connFailures.RLock() defer ps.connFailures.RUnlock() return ps.connFailures.failures[p.ID] } + +// SetDirection sets connection direction for a specific peer. +func (ps *WakuPeerstoreImpl) SetDirection(p peer.ID, direction network.Direction) error { + return ps.peerStore.Put(p, peerDirection, direction) +} + +// Direction fetches the connection direction (Inbound or outBound) for a specific peer +func (ps *WakuPeerstoreImpl) Direction(p peer.ID) (network.Direction, error) { + result, err := ps.peerStore.Get(p, peerDirection) + if err != nil { + return network.DirUnknown, err + } + + return result.(network.Direction), nil +} + +// GroupPeersByDirection returns all the peers in peer store grouped by Inbound or outBound direction +func (ps *WakuPeerstoreImpl) GroupPeersByDirection() (inPeers peer.IDSlice, outPeers peer.IDSlice, err error) { + + for _, p := range ps.Peers() { + direction, err := ps.Direction(p) + if err == nil { + if direction == network.DirInbound { + inPeers = append(inPeers, p) + } else if direction == network.DirOutbound { + outPeers = append(outPeers, p) + } + } + } + return inPeers, outPeers, nil +} diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index 8df889da..446c1ea2 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -36,7 +36,7 @@ type WakuLightPush struct { log *zap.Logger } -// NewWakuRelay returns a new instance of Waku Lightpush struct +// NewWakuLightPush returns a new instance of Waku Lightpush struct func NewWakuLightPush(relay *relay.WakuRelay, log *zap.Logger) *WakuLightPush { wakuLP := new(WakuLightPush) wakuLP.relay = relay diff --git a/waku/v2/protocol/peer_exchange/client.go b/waku/v2/protocol/peer_exchange/client.go index cdb8f81d..0f1470a1 100644 --- a/waku/v2/protocol/peer_exchange/client.go +++ b/waku/v2/protocol/peer_exchange/client.go @@ -10,9 +10,9 @@ import ( "github.com/ethereum/go-ethereum/rlp" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-msgio/pbio" - v2 "github.com/waku-org/go-waku/waku/v2" "github.com/waku-org/go-waku/waku/v2/metrics" - "github.com/waku-org/go-waku/waku/v2/peers" + "github.com/waku-org/go-waku/waku/v2/peermanager" + "github.com/waku-org/go-waku/waku/v2/peerstore" wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/pb" "go.uber.org/zap" @@ -104,12 +104,12 @@ func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb go func() { defer wakuPX.wg.Done() - peerCh := make(chan v2.PeerData) + peerCh := make(chan peermanager.PeerData) defer close(peerCh) wakuPX.peerConnector.Subscribe(ctx, peerCh) for _, p := range discoveredPeers { - peer := v2.PeerData{ - Origin: peers.PeerExchange, + peer := peermanager.PeerData{ + Origin: peerstore.PeerExchange, AddrInfo: p.addrInfo, ENR: p.enr, } diff --git a/waku/v2/protocol/peer_exchange/protocol.go b/waku/v2/protocol/peer_exchange/protocol.go index bf8f011d..7c6a05f3 100644 --- a/waku/v2/protocol/peer_exchange/protocol.go +++ b/waku/v2/protocol/peer_exchange/protocol.go @@ -13,9 +13,9 @@ import ( libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-msgio/pbio" "github.com/waku-org/go-waku/logging" - v2 "github.com/waku-org/go-waku/waku/v2" "github.com/waku-org/go-waku/waku/v2/discv5" "github.com/waku-org/go-waku/waku/v2/metrics" + "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/pb" @@ -33,7 +33,7 @@ var ( // PeerConnector will subscribe to a channel containing the information for all peers found by this discovery protocol type PeerConnector interface { - Subscribe(context.Context, <-chan v2.PeerData) + Subscribe(context.Context, <-chan peermanager.PeerData) } type WakuPeerExchange struct { diff --git a/waku/v2/rendezvous/rendezvous.go b/waku/v2/rendezvous/rendezvous.go index 1c2a0726..606cc6a7 100644 --- a/waku/v2/rendezvous/rendezvous.go +++ b/waku/v2/rendezvous/rendezvous.go @@ -10,8 +10,8 @@ import ( "github.com/libp2p/go-libp2p/core/host" rvs "github.com/waku-org/go-libp2p-rendezvous" - v2 "github.com/waku-org/go-waku/waku/v2" - "github.com/waku-org/go-waku/waku/v2/peers" + "github.com/waku-org/go-waku/waku/v2/peermanager" + "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol" "go.uber.org/zap" ) @@ -39,7 +39,7 @@ type Rendezvous struct { // PeerConnector will subscribe to a channel containing the information for all peers found by this discovery protocol type PeerConnector interface { - Subscribe(context.Context, <-chan v2.PeerData) + Subscribe(context.Context, <-chan peermanager.PeerData) } // NewRendezvous creates an instance of Rendezvous struct @@ -105,12 +105,12 @@ func (r *Rendezvous) DiscoverWithNamespace(ctx context.Context, namespace string if len(addrInfo) != 0 { rp.SetSuccess(cookie) - peerCh := make(chan v2.PeerData) + peerCh := make(chan peermanager.PeerData) defer close(peerCh) r.peerConnector.Subscribe(ctx, peerCh) for _, p := range addrInfo { - peer := v2.PeerData{ - Origin: peers.Rendezvous, + peer := peermanager.PeerData{ + Origin: peerstore.Rendezvous, AddrInfo: p, } select { diff --git a/waku/v2/rendezvous/rendezvous_test.go b/waku/v2/rendezvous/rendezvous_test.go index bdf5cf31..8310be3f 100644 --- a/waku/v2/rendezvous/rendezvous_test.go +++ b/waku/v2/rendezvous/rendezvous_test.go @@ -14,17 +14,16 @@ import ( "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/persistence/sqlite" - v2 "github.com/waku-org/go-waku/waku/v2" + "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/utils" ) type PeerConn struct { - ch <-chan v2.PeerData + ch <-chan peermanager.PeerData } -func (p *PeerConn) Subscribe(ctx context.Context, ch <-chan v2.PeerData) { +func (p *PeerConn) Subscribe(ctx context.Context, ch <-chan peermanager.PeerData) { p.ch = ch - } func NewPeerConn() *PeerConn {