From 68acf82c95beb85b78c48bb43ce9112a511bb3b0 Mon Sep 17 00:00:00 2001 From: Alvaro Revuelta Date: Mon, 27 Feb 2023 18:24:31 +0100 Subject: [PATCH] refactor(networking): peermanager refactor and cleanups (#1539) * refactor(networking): use addServicePeer where needed + add metrics --- apps/chat2/chat2.nim | 7 +- apps/chat2bridge/chat2bridge.nim | 10 +- apps/wakubridge/wakubridge.nim | 8 +- tests/v2/test_peer_manager.nim | 26 ++--- .../wakunode_jsonrpc/test_jsonrpc_admin.nim | 8 +- .../wakunode_jsonrpc/test_jsonrpc_filter.nim | 3 +- .../wakunode_jsonrpc/test_jsonrpc_store.nim | 3 +- waku/v2/node/peer_manager/peer_manager.nim | 104 ++++++++++-------- waku/v2/node/waku_node.nim | 59 +--------- waku/v2/protocol/waku_swap/waku_swap.nim | 16 +-- 10 files changed, 106 insertions(+), 138 deletions(-) diff --git a/apps/chat2/chat2.nim b/apps/chat2/chat2.nim index d44fc99d1..ba9e49b73 100644 --- a/apps/chat2/chat2.nim +++ b/apps/chat2/chat2.nim @@ -24,6 +24,7 @@ import libp2p/[switch, # manage transports, a single entry poi nameresolving/dnsresolver]# define DNS resolution import ../../waku/v2/protocol/waku_message, + ../../waku/v2/protocol/waku_lightpush, ../../waku/v2/protocol/waku_lightpush/rpc, ../../waku/v2/protocol/waku_filter, ../../waku/v2/protocol/waku_store, @@ -489,7 +490,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = echo "Connecting to storenode: " & $(storenode.get()) node.mountStoreClient() - node.setStorePeer(storenode.get()) + node.peerManager.addServicePeer(storenode.get(), WakuStoreCodec) proc storeHandler(response: HistoryResponse) {.gcsafe.} = for msg in response.messages: @@ -509,13 +510,13 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = await mountLightPush(node) node.mountLightPushClient() - node.setLightPushPeer(conf.lightpushnode) + node.peerManager.addServicePeer(parseRemotePeerInfo(conf.lightpushnode), WakuLightpushCodec) if conf.filternode != "": await node.mountFilter() await node.mountFilterClient() - node.setFilterPeer(parseRemotePeerInfo(conf.filternode)) + node.peerManager.addServicePeer(parseRemotePeerInfo(conf.filternode), WakuFilterCodec) proc filterHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.gcsafe.} = trace "Hit filter handler", contentTopic=msg.contentTopic diff --git a/apps/chat2bridge/chat2bridge.nim b/apps/chat2bridge/chat2bridge.nim index 2dfa2bdf9..93eec5a44 100644 --- a/apps/chat2bridge/chat2bridge.nim +++ b/apps/chat2bridge/chat2bridge.nim @@ -16,6 +16,10 @@ import libp2p/errors, ../../../waku/v2/protocol/waku_message, ../../../waku/v2/node/waku_node, + ../../../waku/v2/utils/peers, + ../../../waku/v2/node/peer_manager, + ../../waku/v2/protocol/waku_filter, + ../../waku/v2/protocol/waku_store, # Chat 2 imports ../chat2/chat2, # Common cli config @@ -281,10 +285,12 @@ when isMainModule: waitFor connectToNodes(bridge.nodev2, conf.staticnodes) if conf.storenode != "": - setStorePeer(bridge.nodev2, conf.storenode) + let storePeer = parseRemotePeerInfo(conf.storenode) + bridge.nodev2.peerManager.addServicePeer(storePeer, WakuStoreCodec) if conf.filternode != "": - setFilterPeer(bridge.nodev2, conf.filternode) + let filterPeer = parseRemotePeerInfo(conf.filternode) + bridge.nodev2.peerManager.addServicePeer(filterPeer, WakuFilterCodec) if conf.rpc: let ta = initTAddress(conf.rpcAddress, diff --git a/apps/wakubridge/wakubridge.nim b/apps/wakubridge/wakubridge.nim index 4a5a95cc7..700994a51 100644 --- a/apps/wakubridge/wakubridge.nim +++ b/apps/wakubridge/wakubridge.nim @@ -26,6 +26,8 @@ import libp2p/nameresolving/nameresolver, ../../waku/v2/utils/time, ../../waku/v2/protocol/waku_message, + ../../waku/v2/protocol/waku_store, + ../../waku/v2/protocol/waku_filter, ../../waku/v2/node/message_cache, ../../waku/v2/node/waku_node, ../../waku/v2/node/peer_manager, @@ -428,11 +430,13 @@ when isMainModule: if conf.storenode != "": mountStoreClient(bridge.nodev2) - setStorePeer(bridge.nodev2, conf.storenode) + let storeNode = parseRemotePeerInfo(conf.storenode) + bridge.nodev2.peerManager.addServicePeer(storeNode, WakuStoreCodec) if conf.filternode != "": waitFor mountFilterClient(bridge.nodev2) - setFilterPeer(bridge.nodev2, conf.filternode) + let filterNode = parseRemotePeerInfo(conf.filternode) + bridge.nodev2.peerManager.addServicePeer(filterNode, WakuFilterCodec) if conf.rpc: let ta = initTAddress(conf.rpcAddress, diff --git a/tests/v2/test_peer_manager.nim b/tests/v2/test_peer_manager.nim index b7a92231d..7dfa9a343 100644 --- a/tests/v2/test_peer_manager.nim +++ b/tests/v2/test_peer_manager.nim @@ -96,10 +96,9 @@ procSuite "Peer Manager": await node.mountSwap() node.mountStoreClient() - node.wakuSwap.setPeer(swapPeer.toRemotePeerInfo()) - - node.setStorePeer(storePeer.toRemotePeerInfo()) - node.setFilterPeer(filterPeer.toRemotePeerInfo()) + node.peerManager.addServicePeer(swapPeer.toRemotePeerInfo(), WakuSwapCodec) + node.peerManager.addServicePeer(storePeer.toRemotePeerInfo(), WakuStoreCodec) + node.peerManager.addServicePeer(filterPeer.toRemotePeerInfo(), WakuFilterCodec) # Check peers were successfully added to peer manager check: @@ -127,7 +126,7 @@ procSuite "Peer Manager": await allFutures(nodes.mapIt(it.mountRelay())) # Test default connectedness for new peers - nodes[0].peerManager.addPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuRelayCodec) + nodes[0].peerManager.addPeer(nodes[1].peerInfo.toRemotePeerInfo()) check: # No information about node2's connectedness nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == NotConnected @@ -160,7 +159,7 @@ procSuite "Peer Manager": await nodes[0].start() await nodes[0].mountRelay() - nodes[0].peerManager.addPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuRelayCodec) + nodes[0].peerManager.addPeer(nodes[1].peerInfo.toRemotePeerInfo()) # Set a low backoff to speed up test: 2, 4, 8, 16 nodes[0].peerManager.initialBackoffInSec = 2 @@ -236,7 +235,8 @@ procSuite "Peer Manager": node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId) node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected - await node3.mountRelay() # This should trigger a reconnect + await node3.mountRelay() + await node3.peerManager.connectToRelayPeers() check: # Reconnected to node2 after "restart" @@ -313,12 +313,12 @@ procSuite "Peer Manager": let peerInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo()) # Add all peers (but self) to node 0 - nodes[0].peerManager.addPeer(peerInfos[1], WakuRelayCodec) - nodes[0].peerManager.addPeer(peerInfos[2], WakuRelayCodec) - nodes[0].peerManager.addPeer(peerInfos[3], WakuRelayCodec) + nodes[0].peerManager.addPeer(peerInfos[1]) + nodes[0].peerManager.addPeer(peerInfos[2]) + nodes[0].peerManager.addPeer(peerInfos[3]) - # Attempt to connect to all known peers supporting a given protocol - await nodes[0].peerManager.reconnectPeers(WakuRelayCodec, protocolMatcher(WakuRelayCodec)) + # Connect to relay peers + await nodes[0].peerManager.connectToRelayPeers() check: # Peerstore track all three peers @@ -512,7 +512,7 @@ procSuite "Peer Manager": # Create 15 peers and add them to the peerstore let peers = toSeq(1..15).mapIt(parseRemotePeerInfo("/ip4/0.0.0.0/tcp/0/p2p/" & $PeerId.random().get())) - for p in peers: pm.addPeer(p, "") + for p in peers: pm.addPeer(p) # Check that we have 15 peers in the peerstore check: diff --git a/tests/v2/wakunode_jsonrpc/test_jsonrpc_admin.nim b/tests/v2/wakunode_jsonrpc/test_jsonrpc_admin.nim index b8971e42a..1824cf276 100644 --- a/tests/v2/wakunode_jsonrpc/test_jsonrpc_admin.nim +++ b/tests/v2/wakunode_jsonrpc/test_jsonrpc_admin.nim @@ -167,8 +167,12 @@ procSuite "Waku v2 JSON-RPC API - Admin": filterPeer = PeerInfo.new(generateEcdsaKey(), @[locationAddr]) storePeer = PeerInfo.new(generateEcdsaKey(), @[locationAddr]) - node.setStorePeer(storePeer.toRemotePeerInfo()) - node.setFilterPeer(filterPeer.toRemotePeerInfo()) + node.peerManager.addServicePeer(filterPeer.toRemotePeerInfo(), WakuFilterCodec) + node.peerManager.addServicePeer(storePeer.toRemotePeerInfo(), WakuStoreCodec) + + # Mock that we connected in the past so Identify populated this + node.peerManager.peerStore[ProtoBook][filterPeer.peerId] = @[WakuFilterCodec] + node.peerManager.peerStore[ProtoBook][storePeer.peerId] = @[WakuStoreCodec] let response = await client.get_waku_v2_admin_v1_peers() diff --git a/tests/v2/wakunode_jsonrpc/test_jsonrpc_filter.nim b/tests/v2/wakunode_jsonrpc/test_jsonrpc_filter.nim index be3f18f69..ab7ab0271 100644 --- a/tests/v2/wakunode_jsonrpc/test_jsonrpc_filter.nim +++ b/tests/v2/wakunode_jsonrpc/test_jsonrpc_filter.nim @@ -14,6 +14,7 @@ import ../../../waku/v2/node/jsonrpc/filter/handlers as filter_api, ../../../waku/v2/node/jsonrpc/filter/client as filter_api_client, ../../../waku/v2/protocol/waku_message, + ../../../waku/v2/protocol/waku_filter, ../../../waku/v2/protocol/waku_filter/rpc, ../../../waku/v2/protocol/waku_filter/client, ../../../waku/v2/utils/peers, @@ -40,7 +41,7 @@ procSuite "Waku v2 JSON-RPC API - Filter": await node1.mountFilter() await node2.mountFilterClient() - node2.setFilterPeer(node1.peerInfo.toRemotePeerInfo()) + node2.peerManager.addServicePeer(node1.peerInfo.toRemotePeerInfo(), WakuFilterCodec) # RPC server setup let diff --git a/tests/v2/wakunode_jsonrpc/test_jsonrpc_store.nim b/tests/v2/wakunode_jsonrpc/test_jsonrpc_store.nim index c8895274b..3437278e6 100644 --- a/tests/v2/wakunode_jsonrpc/test_jsonrpc_store.nim +++ b/tests/v2/wakunode_jsonrpc/test_jsonrpc_store.nim @@ -16,6 +16,7 @@ import ../../../waku/v2/protocol/waku_message, ../../../waku/v2/protocol/waku_archive, ../../../waku/v2/protocol/waku_archive/driver/queue_driver, + ../../../waku/v2/protocol/waku_store, ../../../waku/v2/protocol/waku_store/rpc, ../../../waku/v2/utils/peers, ../../../waku/v2/utils/time, @@ -66,7 +67,7 @@ procSuite "Waku v2 JSON-RPC API - Store": var listenSwitch = newStandardSwitch(some(key)) await listenSwitch.start() - node.setStorePeer(listenSwitch.peerInfo.toRemotePeerInfo()) + node.peerManager.addServicePeer(listenSwitch.peerInfo.toRemotePeerInfo(), WakuStoreCodec) listenSwitch.mount(node.wakuRelay) listenSwitch.mount(node.wakuStore) diff --git a/waku/v2/node/peer_manager/peer_manager.nim b/waku/v2/node/peer_manager/peer_manager.nim index 172021a06..cb7834002 100644 --- a/waku/v2/node/peer_manager/peer_manager.nim +++ b/waku/v2/node/peer_manager/peer_manager.nim @@ -5,7 +5,7 @@ else: import - std/[options, sets, sequtils, times], + std/[options, sets, sequtils, times, strutils], chronos, chronicles, metrics, @@ -22,8 +22,9 @@ declareCounter waku_peers_dials, "Number of peer dials", ["outcome"] # TODO: Populate from PeerStore.Source when ready declarePublicCounter waku_node_conns_initiated, "Number of connections initiated", ["source"] declarePublicGauge waku_peers_errors, "Number of peer manager errors", ["type"] -declarePublicGauge waku_connected_peers, "Number of connected peers per direction: inbound|outbound", ["direction"] +declarePublicGauge waku_connected_peers, "Number of connected peers per direction", ["direction"] declarePublicGauge waku_peer_store_size, "Number of peers managed by the peer store" +declarePublicGauge waku_service_peers, "Service peer protocol and multiaddress ", labels = ["protocol", "peerId"] logScope: topics = "waku node peer_manager" @@ -61,6 +62,16 @@ type serviceSlots*: Table[string, RemotePeerInfo] started: bool +proc protocolMatcher*(codec: string): Matcher = + ## Returns a protocol matcher function for the provided codec + proc match(proto: string): bool {.gcsafe.} = + ## Matches a proto with any postfix to the provided codec. + ## E.g. if the codec is `/vac/waku/filter/2.0.0` it matches the protos: + ## `/vac/waku/filter/2.0.0`, `/vac/waku/filter/2.0.0-beta3`, `/vac/waku/filter/2.0.0-actualnonsense` + return proto.startsWith(codec) + + return match + #################### # Helper functions # #################### @@ -244,7 +255,7 @@ proc new*(T: type PeerManager, # Manager interface # ##################### -proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string) = +proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo) = # Adds peer to manager for the specified protocol if remotePeerInfo.peerId == pm.switch.peerInfo.peerId: @@ -260,14 +271,11 @@ proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string) = # Peer already managed return - debug "Adding peer to manager", peerId = remotePeerInfo.peerId, addresses = remotePeerInfo.addrs, proto = proto + trace "Adding peer to manager", peerId = remotePeerInfo.peerId, addresses = remotePeerInfo.addrs pm.peerStore[AddressBook][remotePeerInfo.peerId] = remotePeerInfo.addrs pm.peerStore[KeyBook][remotePeerInfo.peerId] = publicKey - # TODO: Remove this once service slots is ready - pm.peerStore[ProtoBook][remotePeerInfo.peerId] = pm.peerStore[ProtoBook][remotePeerInfo.peerId] & proto - # Add peer to storage. Entry will subsequently be updated with connectedness information if not pm.storage.isNil: pm.storage.insertOrReplace(remotePeerInfo.peerId, pm.peerStore.get(remotePeerInfo.peerId), NotConnected) @@ -279,23 +287,23 @@ proc addServicePeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: str return info "Adding peer to service slots", peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], service = proto + waku_service_peers.set(1, labelValues = [$proto, $remotePeerInfo.addrs[0]]) # Set peer for service slot pm.serviceSlots[proto] = remotePeerInfo - # TODO: Remove proto once fully refactored - pm.addPeer(remotePeerInfo, proto) + pm.addPeer(remotePeerInfo) proc reconnectPeers*(pm: PeerManager, proto: string, - protocolMatcher: Matcher, backoff: chronos.Duration = chronos.seconds(0)) {.async.} = ## Reconnect to peers registered for this protocol. This will update connectedness. ## Especially useful to resume connections from persistent storage after a restart. debug "Reconnecting peers", proto=proto - for storedInfo in pm.peerStore.peers(protocolMatcher): + # Proto is not persisted, we need to iterate over all peers. + for storedInfo in pm.peerStore.peers(protocolMatcher(proto)): # Check that the peer can be connected if storedInfo.connectedness == CannotConnect: debug "Not reconnecting to unreachable or non-existing peer", peerId=storedInfo.peerId @@ -332,10 +340,11 @@ proc dialPeer*(pm: PeerManager, # Dial a given peer and add it to the list of known peers # TODO: check peer validity and score before continuing. Limit number of peers to be managed. - # First add dialed peer info to peer store, if it does not exist yet... + # First add dialed peer info to peer store, if it does not exist yet.. + # TODO: nim libp2p peerstore already adds them if not pm.peerStore.hasPeer(remotePeerInfo.peerId, proto): trace "Adding newly dialed peer to manager", peerId= $remotePeerInfo.peerId, address= $remotePeerInfo.addrs[0], proto= proto - pm.addPeer(remotePeerInfo, proto) + pm.addPeer(remotePeerInfo) return await pm.dialPeer(remotePeerInfo.peerId,remotePeerInfo.addrs, proto, dialTimeout, source) @@ -380,38 +389,32 @@ proc connectToNodes*(pm: PeerManager, # later. await sleepAsync(chronos.seconds(5)) -# Ensures a healthy amount of connected relay peers -proc relayConnectivityLoop*(pm: PeerManager) {.async.} = - debug "Starting relay connectivity loop" - while pm.started: +proc connectToRelayPeers*(pm: PeerManager) {.async.} = + let maxConnections = pm.switch.connManager.inSema.size + let numInPeers = pm.switch.connectedPeers(lpstream.Direction.In).len + let numOutPeers = pm.switch.connectedPeers(lpstream.Direction.Out).len + let numConPeers = numInPeers + numOutPeers - let maxConnections = pm.switch.connManager.inSema.size - let numInPeers = pm.switch.connectedPeers(lpstream.Direction.In).len - let numOutPeers = pm.switch.connectedPeers(lpstream.Direction.Out).len - let numConPeers = numInPeers + numOutPeers + # TODO: Enforce a given in/out peers ratio - # TODO: Enforce a given in/out peers ratio + # Leave some room for service peers + if numConPeers >= (maxConnections - 5): + return - # Leave some room for service peers - if numConPeers >= (maxConnections - 5): - await sleepAsync(ConnectivityLoopInterval) - continue + # TODO: Track only relay connections (nwaku/issues/1566) + let notConnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs)) + let outsideBackoffPeers = notConnectedPeers.filterIt(pm.peerStore.canBeConnected(it.peerId, + pm.initialBackoffInSec, + pm.backoffFactor)) + let numPeersToConnect = min(min(maxConnections - numConPeers, outsideBackoffPeers.len), MaxParalelDials) - let notConnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs)) - let outsideBackoffPeers = notConnectedPeers.filterIt(pm.peerStore.canBeConnected(it.peerId, - pm.initialBackoffInSec, - pm.backoffFactor)) - let numPeersToConnect = min(min(maxConnections - numConPeers, outsideBackoffPeers.len), MaxParalelDials) + info "Relay peer connections", + connectedPeers = numConPeers, + targetConnectedPeers = maxConnections, + notConnectedPeers = notConnectedPeers.len, + outsideBackoffPeers = outsideBackoffPeers.len - info "Relay connectivity loop", - connectedPeers = numConPeers, - targetConnectedPeers = maxConnections, - notConnectedPeers = notConnectedPeers.len, - outsideBackoffPeers = outsideBackoffPeers.len - - await pm.connectToNodes(outsideBackoffPeers[0..