diff --git a/.gitmodules b/.gitmodules index 0365531bf..fb723efb9 100644 --- a/.gitmodules +++ b/.gitmodules @@ -12,7 +12,7 @@ path = vendor/nim-libp2p url = https://github.com/status-im/nim-libp2p.git ignore = dirty - branch = master + branch = unstable [submodule "vendor/nim-stew"] path = vendor/nim-stew url = https://github.com/status-im/nim-stew.git diff --git a/tests/all_tests_v2.nim b/tests/all_tests_v2.nim index 083fc7b0c..adbc9dc78 100644 --- a/tests/all_tests_v2.nim +++ b/tests/all_tests_v2.nim @@ -10,5 +10,6 @@ import ./v2/test_waku_swap, ./v2/test_message_store, ./v2/test_jsonrpc_waku, + ./v2/test_peer_manager, ./v2/test_web3 # will remove it when rln-relay tests get added # TODO ./v2/test_waku_rln_relay diff --git a/tests/v2/test_peer_manager.nim b/tests/v2/test_peer_manager.nim new file mode 100644 index 000000000..57c06e45b --- /dev/null +++ b/tests/v2/test_peer_manager.nim @@ -0,0 +1,48 @@ +import + std/[unittest, options, sets, tables, sequtils], + stew/shims/net as stewNet, + json_rpc/[rpcserver, rpcclient], + eth/[keys, rlp], eth/common/eth_types, + libp2p/[standard_setup, switch, multiaddress], + libp2p/protobuf/minprotobuf, + libp2p/stream/[bufferstream, connection], + libp2p/crypto/crypto, + libp2p/protocols/pubsub/pubsub, + libp2p/protocols/pubsub/rpc/message, + ../../waku/v2/node/wakunode2, + ../../waku/v2/node/peer_manager, + ../../waku/v2/protocol/waku_relay, + ../test_helpers + +procSuite "Peer Manager": + asyncTest "Peer dialing": + let + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"), + Port(60000)) + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"), + Port(60002)) + peerInfo2 = node2.peerInfo + + await allFutures([node1.start(), node2.start()]) + + node1.mountRelay() + node2.mountRelay() + + # Dial node2 from node1 + let conn = await node1.peerManager.dialPeer(peerInfo2, WakuRelayCodec) + + # Check connection + check: + conn.activity + conn.peerInfo.peerId == peerInfo2.peerId + + # Check that node2 is being managed in node1 + check: + node1.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId) + + # Check connectedness + check: + node1.peerManager.connectedness(peerInfo2.peerId) + diff --git a/vendor/nim-libp2p b/vendor/nim-libp2p index 5aebf0990..090ea91cb 160000 --- a/vendor/nim-libp2p +++ b/vendor/nim-libp2p @@ -1 +1 @@ -Subproject commit 5aebf0990e5315a0349e2414e7bc11da96f703bc +Subproject commit 090ea91cb06ac6e35fd1401c2214871b24a77363 diff --git a/waku/v2/node/peer_manager.nim b/waku/v2/node/peer_manager.nim new file mode 100644 index 000000000..ccad7d3fb --- /dev/null +++ b/waku/v2/node/peer_manager.nim @@ -0,0 +1,67 @@ +{.push raises: [Defect, Exception].} + +import + chronos, chronicles, + libp2p/standard_setup, + libp2p/peerstore + +type + PeerManager* = ref object of RootObj + switch*: Switch + peerStore*: PeerStore + +proc new*(T: type PeerManager, switch: Switch): PeerManager = + T(switch: switch, + peerStore: PeerStore.new()) + +#################### +# Dialer interface # +#################### + +proc dialPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string): Future[Connection] {.async.} = + # Dial a given peer and add it to the list of known peers + # @TODO check peer validity, duplicates and score before continuing. Limit number of peers to be managed. + + # First add dialed peer info to peer store... + + debug "Adding dialed peer to manager", peerId = peerInfo.peerId, addr = peerInfo.addrs[0], proto = proto + + # ...known addresses + for multiaddr in peerInfo.addrs: + pm.peerStore.addressBook.add(peerInfo.peerId, multiaddr) + + # ...public key + var publicKey: PublicKey + discard peerInfo.peerId.extractPublicKey(publicKey) + + pm.peerStore.keyBook.set(peerInfo.peerId, publicKey) + + # ...associated protocols + pm.peerStore.protoBook.add(peerInfo.peerId, proto) + + info "Dialing peer from manager", wireAddr = peerInfo.addrs[0], peerId = peerInfo.peerId + + # Dial Peer + # @TODO Keep track of conn and connected state in peer store + return await pm.switch.dial(peerInfo.peerId, peerInfo.addrs, proto) + +##################### +# Manager interface # +##################### + +proc peers*(pm: PeerManager): seq[StoredInfo] = + # Return the known info for all peers + pm.peerStore.peers() + +proc connectedness*(pm: PeerManager, peerId: PeerId): bool = + # Return the connection state of the given, managed peer + # @TODO the PeerManager should keep and update local connectedness state for peers, redial on disconnect, etc. + # @TODO richer return than just bool, e.g. add enum "CanConnect", "CannotConnect", etc. based on recent connection attempts + + let storedInfo = pm.peerStore.get(peerId) + + if (storedInfo == StoredInfo()): + # Peer is not managed, therefore not connected + return false + else: + pm.switch.isConnected(peerId) diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index 3498c27e1..9485c7a00 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -8,7 +8,6 @@ import libp2p/protocols/protocol, # NOTE For TopicHandler, solve with exports? libp2p/protocols/pubsub/pubsub, - libp2p/peerinfo, libp2p/standard_setup, ../protocol/[waku_relay, message_notifier], ../protocol/waku_store/waku_store, @@ -16,7 +15,8 @@ import ../protocol/waku_filter/waku_filter, ../utils/peers, ./message_store/message_store, - ../utils/requests + ../utils/requests, + ./peer_manager declarePublicCounter waku_node_messages, "number of messages received", ["type"] declarePublicGauge waku_node_filters, "number of content filter subscriptions" @@ -45,6 +45,7 @@ type # NOTE based on Eth2Node in NBC eth2_network.nim WakuNode* = ref object of RootObj + peerManager*: PeerManager switch*: Switch wakuRelay*: WakuRelay wakuStore*: WakuStore @@ -130,6 +131,7 @@ proc init*(T: type WakuNode, nodeKey: crypto.PrivateKey, # triggerSelf = true, sign = false, # verifySignature = false).PubSub result = WakuNode( + peerManager: PeerManager.new(switch), switch: switch, rng: rng, peerInfo: peerInfo, @@ -344,11 +346,8 @@ proc dialPeer*(n: WakuNode, address: string) {.async.} = info "Dialing peer", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId # NOTE This is dialing on WakuRelay protocol specifically - # TODO Keep track of conn and connected state somewhere (WakuRelay?) - #p.conn = await p.switch.dial(remotePeer, WakuRelayCodec) - #p.connected = true - discard await n.switch.dial(remotePeer.peerId, remotePeer.addrs, WakuRelayCodec) - info "Post switch dial" + discard await n.peerManager.dialPeer(remotePeer, WakuRelayCodec) + info "Post peerManager dial" proc setStorePeer*(n: WakuNode, address: string) = info "dialPeer", address = address @@ -381,7 +380,7 @@ proc connectToNodes*(n: WakuNode, nodes: seq[string]) {.async.} = proc connectToNodes*(n: WakuNode, nodes: seq[PeerInfo]) {.async.} = for peerInfo in nodes: info "connectToNodes", peer = peerInfo - discard await n.switch.dial(peerInfo.peerId, peerInfo.addrs, WakuRelayCodec) + discard await n.peerManager.dialPeer(peerInfo, WakuRelayCodec) # The issue seems to be around peers not being fully connected when # trying to subscribe. So what we do is sleep to guarantee nodes are