diff --git a/tests/v2/test_peer_manager.nim b/tests/v2/test_peer_manager.nim index b11f96c48..65961372f 100644 --- a/tests/v2/test_peer_manager.nim +++ b/tests/v2/test_peer_manager.nim @@ -47,7 +47,7 @@ procSuite "Peer Manager": # Check connectedness check: - node1.peerManager.connectedness(peerInfo2.peerId) + node1.peerManager.connectedness(peerInfo2.peerId) == Connectedness.Connected await allFutures([node1.stop(), node2.stop()]) @@ -118,3 +118,45 @@ procSuite "Peer Manager": it.protos.contains(WakuStoreCodec)) await node.stop() + + + asyncTest "Peer manager keeps track of connections": + let + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"), + Port(60000)) + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"), + Port(60002)) + peerInfo2 = node2.peerInfo + + await node1.start() + + node1.mountRelay() + node2.mountRelay() + + # Test default connectedness for new peers + node1.peerManager.addPeer(peerInfo2, WakuRelayCodec) + check: + # No information about node2's connectedness + node1.peerManager.connectedness(peerInfo2.peerId) == NotConnected + + # Purposefully don't start node2 + # Attempt dialing node2 from node1 + discard await node1.peerManager.dialPeer(peerInfo2, WakuRelayCodec, 2.seconds) + check: + # Cannot connect to node2 + node1.peerManager.connectedness(peerInfo2.peerId) == CannotConnect + + # Successful connection + await node2.start() + discard await node1.peerManager.dialPeer(peerInfo2, WakuRelayCodec, 2.seconds) + check: + # Currently connected to node2 + node1.peerManager.connectedness(peerInfo2.peerId) == Connected + + # Stop node. Gracefully disconnect from all peers. + await node1.stop() + check: + # Not currently connected to node2, but had recent, successful connection. + node1.peerManager.connectedness(peerInfo2.peerId) == CanConnect diff --git a/waku/v2/node/jsonrpc/admin_api.nim b/waku/v2/node/jsonrpc/admin_api.nim index 709c9f628..819b02e19 100644 --- a/waku/v2/node/jsonrpc/admin_api.nim +++ b/waku/v2/node/jsonrpc/admin_api.nim @@ -42,7 +42,7 @@ proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = wPeers.insert(node.peerManager.peers(WakuRelayCodec) .mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId), protocol: WakuRelayCodec, - connected: node.peerManager.connectedness(it.peerId))), + connected: node.peerManager.connectedness(it.peerId) == Connectedness.Connected)), wPeers.len) # Append to the end of the sequence if not node.wakuFilter.isNil: @@ -50,7 +50,7 @@ proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = wPeers.insert(node.peerManager.peers(WakuFilterCodec) .mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId), protocol: WakuFilterCodec, - connected: node.peerManager.connectedness(it.peerId))), + connected: node.peerManager.connectedness(it.peerId) == Connectedness.Connected)), wPeers.len) # Append to the end of the sequence if not node.wakuSwap.isNil: @@ -58,7 +58,7 @@ proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = wPeers.insert(node.peerManager.peers(WakuSwapCodec) .mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId), protocol: WakuSwapCodec, - connected: node.peerManager.connectedness(it.peerId))), + connected: node.peerManager.connectedness(it.peerId) == Connectedness.Connected)), wPeers.len) # Append to the end of the sequence if not node.wakuStore.isNil: @@ -66,7 +66,7 @@ proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = wPeers.insert(node.peerManager.peers(WakuStoreCodec) .mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId), protocol: WakuStoreCodec, - connected: node.peerManager.connectedness(it.peerId))), + connected: node.peerManager.connectedness(it.peerId) == Connectedness.Connected)), wPeers.len) # Append to the end of the sequence # @TODO filter output on protocol/connected-status diff --git a/waku/v2/node/peer_manager.nim b/waku/v2/node/peer_manager.nim index 58d86666f..11ff6d393 100644 --- a/waku/v2/node/peer_manager.nim +++ b/waku/v2/node/peer_manager.nim @@ -14,16 +14,54 @@ logScope: topics = "wakupeers" type + Connectedness* = enum + # NotConnected: default state for a new peer. No connection and no further information on connectedness. + NotConnected, + # CannotConnect: attempted to connect to peer, but failed. + CannotConnect, + # CanConnect: was recently connected to peer and disconnected gracefully. + CanConnect, + # Connected: actively connected to peer. + Connected + + ConnectionBook* = object of PeerBook[Connectedness] + + WakuPeerStore* = ref object of PeerStore + connectionBook*: ConnectionBook + PeerManager* = ref object of RootObj switch*: Switch - peerStore*: PeerStore + peerStore*: WakuPeerStore const defaultDialTimeout = 1.minutes # @TODO should this be made configurable? +proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} = + case event.kind + of ConnEventKind.Connected: + pm.peerStore.connectionBook.set(peerId, Connected) + return + of ConnEventKind.Disconnected: + pm.peerStore.connectionBook.set(peerId, CanConnect) + return + +proc new*(T: type WakuPeerStore): WakuPeerStore = + var p: WakuPeerStore + new(p) + return p + proc new*(T: type PeerManager, switch: Switch): PeerManager = - T(switch: switch, - peerStore: PeerStore.new()) + let pm = PeerManager(switch: switch, + peerStore: WakuPeerStore.new()) + + proc peerHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} = + onConnEvent(pm, peerId, event) + + + pm.switch.addConnEventHandler(peerHook, ConnEventKind.Connected) + pm.switch.addConnEventHandler(peerHook, ConnEventKind.Disconnected) + + return pm #################### # Helper functions # @@ -46,7 +84,7 @@ proc peers*(pm: PeerManager, proto: string): seq[StoredInfo] = # Return the known info for all peers registered on the specified protocol pm.peers.filterIt(it.protos.contains(proto)) -proc connectedness*(pm: PeerManager, peerId: PeerId): bool = +proc connectedness*(pm: PeerManager, peerId: PeerId): Connectedness = # Return the connection state of the given, managed peer # @TODO the PeerManager should keep and update local connectedness state for peers, redial on disconnect, etc. # @TODO richer return than just bool, e.g. add enum "CanConnect", "CannotConnect", etc. based on recent connection attempts @@ -55,9 +93,9 @@ proc connectedness*(pm: PeerManager, peerId: PeerId): bool = if (storedInfo == StoredInfo()): # Peer is not managed, therefore not connected - return false + return NotConnected else: - pm.switch.isConnected(peerId) + pm.peerStore.connectionBook.get(peerId) proc hasPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string): bool = # Returns `true` if peer is included in manager for the specified protocol @@ -123,10 +161,12 @@ proc dialPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string, dialTimeout = # @TODO indicate CannotConnect on peer metadata debug "Dialing remote peer timed out" waku_peers_dials.inc(labelValues = ["timeout"]) + pm.peerStore.connectionBook.set(peerInfo.peerId, CannotConnect) return none(Connection) except CatchableError as e: # @TODO any redial attempts? # @TODO indicate CannotConnect on peer metadata debug "Dialing remote peer failed", msg = e.msg waku_peers_dials.inc(labelValues = ["failed"]) + pm.peerStore.connectionBook.set(peerInfo.peerId, CannotConnect) return none(Connection) diff --git a/waku/v2/protocol/waku_store/waku_store.nim b/waku/v2/protocol/waku_store/waku_store.nim index 79f5ddee3..ca972600d 100644 --- a/waku/v2/protocol/waku_store/waku_store.nim +++ b/waku/v2/protocol/waku_store/waku_store.nim @@ -394,7 +394,7 @@ proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.asyn let peerOpt = w.peerManager.selectPeer(WakuStoreCodec) if peerOpt.isNone(): - error "failed to connect to remote peer" + error "no suitable remote peers" waku_store_errors.inc(labelValues = [dialFailure]) return @@ -433,7 +433,7 @@ proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHand let peerOpt = ws.peerManager.selectPeer(WakuStoreCodec) if peerOpt.isNone(): - error "failed to connect to remote peer" + error "no suitable remote peers" waku_store_errors.inc(labelValues = [dialFailure]) return diff --git a/waku/v2/protocol/waku_swap/waku_swap.nim b/waku/v2/protocol/waku_swap/waku_swap.nim index 7f29b9b87..7ec827d71 100644 --- a/waku/v2/protocol/waku_swap/waku_swap.nim +++ b/waku/v2/protocol/waku_swap/waku_swap.nim @@ -93,7 +93,7 @@ proc sendCheque*(ws: WakuSwap) {.async.} = let peerOpt = ws.peerManager.selectPeer(WakuSwapCodec) if peerOpt.isNone(): - error "failed to connect to remote peer" + error "no suitable remote peers" waku_swap_errors.inc(labelValues = [dialFailure]) return